Browse Source

Major refactor of network code

Here I massively change both the external and internal API of the TCP
sockets and servers.

This change introduces the concept of a protocol object like is found in
Twisted Python. I believe this allows for a much cleaner description of how
a socket behaves. What was once a single object "client" or "connection" is
now represented by two objects: a "connection" and a "protocol".

Well - I don't want to ramble too much because neither API is yet public or
documented.  Look the diff of test/test-pingpong.js to see how things have
changed.
v0.7.4-release
Ryan 16 years ago
parent
commit
15d24d8002
  1. 594
      src/net.cc
  2. 138
      src/net.h
  3. 28
      src/node.cc
  4. 77
      test/test-pingpong.js

594
src/net.cc

@ -4,6 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <strings.h> #include <strings.h>
#include <errno.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -12,9 +13,20 @@
using namespace v8; using namespace v8;
using namespace node; using namespace node;
#define ON_CONNECT_SYMBOL String::NewSymbol("onConnect") #define ON_RECEIVE_SYMBOL String::NewSymbol("onReceive")
#define ON_CONNECTION_SYMBOL String::NewSymbol("onConnection") #define ON_DISCONNECT_SYMBOL String::NewSymbol("onDisconnect")
#define ON_READ_SYMBOL String::NewSymbol("onRead") #define ON_CONNECT_SYMBOL String::NewSymbol("onConnect")
#define ON_DRAIN_SYMBOL String::NewSymbol("onDrain")
#define ON_TIMEOUT_SYMBOL String::NewSymbol("onTimeout")
#define ON_ERROR_SYMBOL String::NewSymbol("onError")
#define SEND_SYMBOL String::NewSymbol("send")
#define DISCONNECT_SYMBOL String::NewSymbol("disconnect")
#define CONNECT_SYMBOL String::NewSymbol("connect")
#define ENCODING_SYMBOL String::NewSymbol("encoding")
#define TIMEOUT_SYMBOL String::NewSymbol("timeout")
#define PROTOCOL_SYMBOL String::NewSymbol("protocol")
static const struct addrinfo tcp_hints = static const struct addrinfo tcp_hints =
/* ai_flags */ { AI_PASSIVE /* ai_flags */ { AI_PASSIVE
@ -27,325 +39,194 @@ static const struct addrinfo tcp_hints =
/* ai_next */ , NULL /* ai_next */ , NULL
}; };
Persistent<Function> tcp_connection_constructor;
Server::Server (Handle<Object> handle, int backlog) void
: ObjectWrap(handle) Connection::Initialize (v8::Handle<v8::Object> target)
{
//HandleScope scope;
oi_server_init(&server_, backlog);
server_.on_connection = Server::OnConnection;
// server_.on_error = Server::OnError;
server_.data = this;
}
Server::~Server ()
{
//HandleScope scope;
oi_server_close(&server_);
oi_server_detach(&server_);
}
Handle<Value>
Server::New (const Arguments& args)
{ {
HandleScope scope; HandleScope scope;
int backlog = 1024; // default Local<FunctionTemplate> t = FunctionTemplate::New(Connection::v8New);
if (args.Length() > 0 && args[0]->IsNumber()) t->InstanceTemplate()->SetInternalFieldCount(1);
backlog = args[0]->IntegerValue(); target->Set(String::NewSymbol("TCPConnection"), t->GetFunction());
new Server(args.Holder(), backlog); tcp_connection_constructor = Persistent<Function>::New(t->GetFunction());
return args.This(); NODE_SET_METHOD(t->InstanceTemplate(), "connect", Connection::v8Connect);
NODE_SET_METHOD(t->InstanceTemplate(), "disconnect", Connection::v8Disconnect);
NODE_SET_METHOD(t->InstanceTemplate(), "send", Connection::v8Send);
} }
Handle<Value> Local<Object>
Server::ListenTCP (const Arguments& args) Connection::NewInstance (Local<Function> protocol)
{ {
if (args.Length() < 2) return Undefined();
HandleScope scope; HandleScope scope;
Handle<Value> argv[] = { protocol };
Local<Object> instance = tcp_connection_constructor->NewInstance(1, argv);
return scope.Close(instance);
}
Server *server = NODE_UNWRAP(Server, args.Holder()); Connection::Connection (Handle<Object> handle)
: ObjectWrap(handle)
{
HandleScope scope;
String::AsciiValue port(args[0]); Local<Object> protocol = GetProtocol();
int callback_index = 1; encoding_ = RAW;
char *host = NULL; Local<Value> encoding_v = protocol->Get(ENCODING_SYMBOL);
if (args[1]->IsString()) { if (encoding_v->IsString()) {
callback_index = 2; Local<String> encoding_string = encoding_v->ToString();
String::AsciiValue host_v(args[1]->ToString()); char buf[5]; // need enough room for "utf8" or "raw"
if(args[1]->IsString()) host = *host_v; encoding_string->WriteAscii(buf, 0, 4);
buf[4] = '\0';
if(strcasecmp(buf, "utf8") == 0) encoding_ = UTF8;
} }
// For servers call getaddrinfo inline. This is blocking but it shouldn't double timeout = 0.0; // default
// matter--ever. If someone actually complains then simply swap it out Local<Value> timeout_v = protocol->Get(TIMEOUT_SYMBOL);
// with a libeio call. if (encoding_v->IsInt32())
struct addrinfo *address = NULL; timeout = timeout_v->Int32Value() / 1000.0;
int r = getaddrinfo(host, *port, &tcp_hints, &address);
if (r != 0)
return ThrowException(String::New("Error looking up hostname"));
if (!args[callback_index]->IsFunction())
return ThrowException(String::New("Must supply onConnection callback"));
server->handle_->Set(ON_CONNECTION_SYMBOL, args[callback_index]);
r = oi_server_listen(&server->server_, address); host_ = NULL;
if (r != 0) port_ = NULL;
return ThrowException(String::New("Error listening on port"));
oi_server_attach(EV_DEFAULT_UC_ &server->server_);
freeaddrinfo(address);
return Undefined();
}
Handle<Value> oi_socket_init(&socket_, timeout);
Server::Close (const Arguments& args) socket_.on_connect = Connection::_OnConnect;
{ socket_.on_read = Connection::_OnReceive;
HandleScope scope; socket_.on_drain = Connection::_OnDrain;
Server *server = NODE_UNWRAP(Server, args.Holder()); socket_.on_error = Connection::_OnError;
oi_server_close(&server->server_); socket_.on_close = Connection::_OnDisconnect;
return Undefined(); socket_.on_timeout = Connection::_OnTimeout;
socket_.data = this;
} }
oi_socket* Local<Object>
Server::OnConnection (oi_server *s, struct sockaddr *remote_addr, socklen_t remote_addr_len) Connection::GetProtocol (void)
{ {
Server *server = static_cast<Server*> (s->data);
HandleScope scope; HandleScope scope;
Socket *socket = Socket::NewConnection(60.0); Local<Value> protocol_v = handle_->Get(PROTOCOL_SYMBOL);
if (protocol_v->IsObject()) {
Local<Value> callback_v = server->handle_->Get(ON_CONNECTION_SYMBOL); Local<Object> protocol = protocol_v->ToObject();
if (!callback_v->IsFunction()) return scope.Close(protocol);
return NULL; // produce error? }
Local<Function> callback = Local<Function>::Cast(callback_v);
const int argc = 1;
Local<Value> argv[argc];
argv[0] = Local<Value>::New(socket->handle_);
callback->Call(server->handle_, argc, argv);
return &socket->socket_; return Local<Object>();
} }
Handle<Value> Handle<Value>
Socket::New(const Arguments& args) Connection::v8New (const Arguments& args)
{ {
if (args.Length() > 1)
return Undefined();
HandleScope scope; HandleScope scope;
// Default options if (args[0]->IsFunction() == false)
double timeout = 60.0; // in seconds return ThrowException(String::New("Must pass a class as the first argument."));
enum {UTF8, RAW} encoding ;
// Set options from argument. Handle<Function> protocol = Handle<Function>::Cast(args[0]);
if (args.Length() == 1 && args[0]->IsObject()) { Handle<Value> argv[] = { args.This() };
Local<Object> options = args[0]->ToObject(); Local<Object> protocol_instance = protocol->NewInstance(1, argv);
Local<Value> timeout_value = options->Get(String::NewSymbol("timeout")); args.This()->Set(PROTOCOL_SYMBOL, protocol_instance);
Local<Value> encoding_value = options->Get(String::NewSymbol("encoding"));
if (timeout_value->IsNumber()) { new Connection(args.This());
// timeout is specified in milliseconds like other time
// values in javascript
timeout = timeout_value->NumberValue() / 1000;
}
if (encoding_value->IsString()) {
Local<String> encoding_string = encoding_value->ToString();
char buf[5]; // need enough room for "utf8" or "raw"
encoding_string->WriteAscii(buf, 0, 4);
buf[4] = '\0';
if(strcasecmp(buf, "utf8") == 0) encoding = UTF8;
}
}
new Socket(args.Holder(), timeout);
return args.This(); return args.This();
} }
void
Socket::SetEncoding (Handle<Value> encoding_value)
{
if (encoding_value->IsString()) {
HandleScope scope;
Local<String> encoding_string = encoding_value->ToString();
char buf[5]; // need enough room for "utf8" or "raw"
encoding_string->WriteAscii(buf, 0, 4);
buf[4] = '\0';
if(strcasecmp(buf, "utf8") == 0)
encoding_ = UTF8;
else
encoding_ = RAW;
}
}
Handle<Value> Handle<Value>
Socket::ConnectTCP (const Arguments& args) Connection::v8Connect (const Arguments& args)
{ {
if (args.Length() < 1)
return Undefined();
HandleScope scope; HandleScope scope;
Socket *socket = NODE_UNWRAP(Socket, args.Holder()); Connection *connection = NODE_UNWRAP(Connection, args.Holder());
String::AsciiValue port(args[0]); if (args.Length() == 0 || args[0]->IsInt32() == false)
socket->port_ = strdup(*port); return ThrowException(String::New("Must specify a port."));
assert(socket->host_ == NULL); String::AsciiValue port_sv(args[0]->ToString());
String::AsciiValue host_v(args[1]->ToString()); assert(connection->port_ == NULL);
if(args[1]->IsString()) { connection->port_ = strdup(*port_sv);
socket->host_ = strdup(*host_v);
}
if(args[2]->IsFunction()) { assert(connection->host_ == NULL);
socket->handle_->Set(ON_CONNECT_SYMBOL , args[2]); if (args.Length() > 1 && args[1]->IsString()) {
String::Utf8Value host_sv(args[1]->ToString());
connection->host_ = strdup(*host_sv);
} }
/* For the moment I will do DNS lookups in the thread pool. This is /* For the moment I will do DNS lookups in the eio thread pool. This is
* sub-optimal and cannot handle massive numbers of requests but it is * sub-optimal and cannot handle massive numbers of requests but it is
* quite portable. * quite portable.
* In the future I will move to a system using adns or udns: * In the future I will move to a system using adns or udns:
* http://lists.schmorp.de/pipermail/libev/2009q1/000632.html * http://lists.schmorp.de/pipermail/libev/2009q1/000632.html
*/ */
eio_warmup(); eio_warmup();
eio_req *req = eio_custom (Socket::Resolve, EIO_PRI_DEFAULT, Socket::AfterResolve, socket); eio_req *req = eio_custom( Connection::Resolve
, EIO_PRI_DEFAULT
, Connection::AfterResolve
, connection
);
return Undefined(); return Undefined();
} }
/* This function is executed in the thread pool. It cannot touch anything! */
int int
Socket::Resolve (eio_req *req) Connection::Resolve (eio_req *req)
{ {
Socket *socket = static_cast<Socket*> (req->data); Connection *connection = static_cast<Connection*> (req->data);
struct addrinfo *address = NULL; struct addrinfo *address = NULL;
req->result = getaddrinfo(connection->host_, connection->port_, &tcp_hints, &address);
req->result = getaddrinfo(socket->host_, socket->port_, &tcp_hints, &address);
req->ptr2 = address; req->ptr2 = address;
free(socket->host_); free(connection->host_);
socket->host_ = NULL; connection->host_ = NULL;
free(socket->port_); free(connection->port_);
socket->port_ = NULL; connection->port_ = NULL;
return 0; return 0;
} }
int int
Socket::AfterResolve (eio_req *req) Connection::AfterResolve (eio_req *req)
{ {
Socket *socket = static_cast<Socket*> (req->data); Connection *connection = static_cast<Connection*> (req->data);
struct addrinfo *address = static_cast<struct addrinfo *>(req->ptr2); struct addrinfo *address = static_cast<struct addrinfo *>(req->ptr2);
req->ptr2 = NULL; req->ptr2 = NULL;
int r = 0; int r = 0;
if (req->result == 0) { if (req->result == 0) {
r = oi_socket_connect (&socket->socket_, address); r = connection->Connect(address);
} }
if (address)
freeaddrinfo(address); if (address) freeaddrinfo(address);
// no error. return. // no error. return.
if(r == 0 && req->result == 0) { if (r == 0 && req->result == 0) {
oi_socket_attach (EV_DEFAULT_UC_ &socket->socket_); oi_socket_attach (EV_DEFAULT_UC_ &connection->socket_);
return 0; return 0;
} }
HandleScope scope; // return error?
Handle<Value> onconnect_value = socket->handle_->Get(ON_CONNECT_SYMBOL);
if (!onconnect_value->IsFunction()) return 0;
Handle<Function> onconnect = Handle<Function>::Cast(onconnect_value);
TryCatch try_catch;
const int argc = 1;
Local<Value> argv[argc];
argv[0] = Integer::New(r | req->result); // FIXME very stupid error code.
onconnect->Call(socket->handle_, argc, argv);
if(try_catch.HasCaught())
fatal_exception(try_catch);
return 0;
}
Handle<Value>
Socket::Close (const Arguments& args)
{
HandleScope scope;
Socket *socket = NODE_UNWRAP(Socket, args.Holder());
oi_socket_close(&socket->socket_);
return Undefined();
}
Socket::Socket(Handle<Object> handle, double timeout)
: ObjectWrap(handle)
{
//HandleScope scope;
oi_socket_init(&socket_, timeout);
socket_.on_connect = Socket::OnConnect;
socket_.on_read = Socket::OnRead;
// socket_.on_drain = Socket::OnDrain;
// socket_.on_error = Socket::OnError;
socket_.on_close = Socket::OnClose;
socket_.on_timeout = Socket::OnTimeout;
socket_.data = this;
encoding_ = UTF8; // default encoding.
host_ = NULL;
port_ = NULL;
}
Socket* return r | req->result;
Socket::NewConnection (double timeout)
{
HandleScope scope;
Local<Object> socket_handle = socket_template->GetFunction()->NewInstance();
Socket *socket = new Socket(socket_handle, 60.0);
socket->handle_->Delete(String::NewSymbol("connectTCP"));
return socket;
}
Socket::~Socket ()
{
oi_socket_close(&socket_);
oi_socket_detach(&socket_);
free(host_);
free(port_);
//HandleScope scope;
handle_->Delete(String::NewSymbol("write"));
handle_->Delete(String::NewSymbol("close"));
} }
Handle<Value> Handle<Value>
Socket::SetEncoding (const Arguments& args) Connection::v8Disconnect (const Arguments& args)
{ {
HandleScope scope; HandleScope scope;
Socket *socket = NODE_UNWRAP(Socket, args.Holder()); Connection *connection = NODE_UNWRAP(Connection, args.Holder());
socket->SetEncoding(args[0]); connection->Disconnect();
return Undefined(); return Undefined();
} }
Handle<Value> Handle<Value>
Socket::Write (const Arguments& args) Connection::v8Send (const Arguments& args)
{ {
HandleScope scope; HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder());
Socket *socket = NODE_UNWRAP(Socket, args.Holder());
// TODO support a callback using buf->on_release
if (args[0] == Null()) { if (args[0] == Null()) {
oi_socket_write_eof(&socket->socket_); oi_socket_write_eof(&connection->socket_);
} else if (args[0]->IsString()) { } else if (args[0]->IsString()) {
// utf8 encoding // utf8 encoding
@ -353,191 +234,216 @@ Socket::Write (const Arguments& args)
size_t length = s->Utf8Length(); size_t length = s->Utf8Length();
oi_buf *buf = oi_buf_new2(length); oi_buf *buf = oi_buf_new2(length);
s->WriteUtf8(buf->base, length); s->WriteUtf8(buf->base, length);
oi_socket_write(&socket->socket_, buf); connection->Send(buf);
} else if (args[0]->IsArray()) { } else if (args[0]->IsArray()) {
// raw encoding // raw encoding
Handle<Array> array = Handle<Array>::Cast(args[0]); Handle<Array> array = Handle<Array>::Cast(args[0]);
size_t length = array->Length(); size_t length = array->Length();
oi_buf *buf = oi_buf_new2(length); oi_buf *buf = oi_buf_new2(length);
for (int i = 0; i < length; i++) { for (size_t i = 0; i < length; i++) {
Local<Value> int_value = array->Get(Integer::New(i)); Local<Value> int_value = array->Get(Integer::New(i));
buf->base[i] = int_value->IntegerValue(); buf->base[i] = int_value->IntegerValue();
} }
oi_socket_write(&socket->socket_, buf); connection->Send(buf);
} else return ThrowException(String::New("Bad argument")); } else return ThrowException(String::New("Bad argument"));
return Undefined(); return Undefined();
} }
void void
Socket::OnConnect (oi_socket *s) Connection::OnReceive (const void *buf, size_t len)
{ {
Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> on_connect_value = socket->handle_->Get(ON_CONNECT_SYMBOL); Local<Object> protocol = GetProtocol();
if (!on_connect_value->IsFunction()) Handle<Value> callback_v = protocol->Get(ON_RECEIVE_SYMBOL);
return; if (!callback_v->IsFunction()) return;
Handle<Function> on_connect = Handle<Function>::Cast(on_connect_value);
TryCatch try_catch;
const int argc = 1;
Local<Value> argv[argc];
argv[0] = Integer::New(0);
Handle<Value> r = on_connect->Call(socket->handle_, argc, argv);
if(try_catch.HasCaught()) Handle<Function> callback = Handle<Function>::Cast(callback_v);
fatal_exception(try_catch);
}
void
Socket::OnRead (oi_socket *s, const void *buf, size_t count)
{
Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope;
Handle<Value> onread_value = socket->handle_->Get(ON_READ_SYMBOL);
if (!onread_value->IsFunction()) return;
Handle<Function> onread = Handle<Function>::Cast(onread_value);
const int argc = 1; const int argc = 1;
Handle<Value> argv[argc]; Handle<Value> argv[argc];
if(count) { if(len) {
if(socket->encoding_ == UTF8) { if(encoding_ == UTF8) {
// utf8 encoding // utf8 encoding
Handle<String> chunk = String::New((const char*)buf, count); Handle<String> chunk = String::New((const char*)buf, len);
argv[0] = chunk; argv[0] = chunk;
} else { } else {
// raw encoding // raw encoding
Local<Array> array = Array::New(count); Local<Array> array = Array::New(len);
for (int i = 0; i < count; i++) { for (size_t i = 0; i < len; i++) {
char val = static_cast<const char*>(buf)[i]; char val = static_cast<const char*>(buf)[i];
array->Set(Integer::New(i), Integer::New(val)); array->Set(Integer::New(i), Integer::New(val));
} }
argv[0] = array; argv[0] = array;
} }
} else { } else {
argv[0] = Local<Value>::New(Null()); argv[0] = Local<Value>::New(Null());
} }
TryCatch try_catch; TryCatch try_catch;
callback->Call(protocol, argc, argv);
Handle<Value> r = onread->Call(socket->handle_, argc, argv); if (try_catch.HasCaught())
fatal_exception(try_catch); // XXX is this the right action to take?
if(try_catch.HasCaught())
fatal_exception(try_catch);
} }
void void
Socket::OnClose (oi_socket *s) Connection::OnError (oi_error e)
{ {
Socket *socket = static_cast<Socket*> (s->data); }
HandleScope scope;
Handle<Value> onclose_value = socket->handle_->Get( String::NewSymbol("onClose") );
if (!onclose_value->IsFunction()) {
delete socket;
return;
}
Handle<Function> onclose = Handle<Function>::Cast(onclose_value);
TryCatch try_catch; #define DEFINE_SIMPLE_CALLBACK(name, symbol) \
void name () \
{ \
HandleScope scope; \
Local<Object> protocol = GetProtocol();\
Local<Value> callback_v = protocol->Get(symbol); \
if (!callback_v->IsFunction()) return; \
Handle<Function> callback = Handle<Function>::Cast(callback_v); \
callback->Call(protocol, 0, NULL); \
}
Handle<Value> r = onclose->Call(socket->handle_, 0, NULL); DEFINE_SIMPLE_CALLBACK(Connection::OnConnect, ON_CONNECT_SYMBOL)
DEFINE_SIMPLE_CALLBACK(Connection::OnDrain, ON_DRAIN_SYMBOL)
DEFINE_SIMPLE_CALLBACK(Connection::OnDisconnect, ON_DISCONNECT_SYMBOL)
DEFINE_SIMPLE_CALLBACK(Connection::OnTimeout, ON_TIMEOUT_SYMBOL)
if(try_catch.HasCaught()) void
fatal_exception(try_catch); Acceptor::Initialize (Handle<Object> target)
{
HandleScope scope;
delete socket; Local<FunctionTemplate> tcp_server_template =
FunctionTemplate::New(Acceptor::v8New);
tcp_server_template->InstanceTemplate()->SetInternalFieldCount(1);
target->Set(String::NewSymbol("TCPServer"), tcp_server_template->GetFunction());
NODE_SET_METHOD( tcp_server_template->InstanceTemplate()
, "listen"
, Acceptor::v8Listen
);
NODE_SET_METHOD( tcp_server_template->InstanceTemplate()
, "close"
, Acceptor::v8Close
);
} }
void Acceptor::Acceptor (Handle<Object> handle, Handle<Object> options)
Socket::OnDrain (oi_socket *s) : ObjectWrap(handle)
{ {
Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> ondrain_value = socket->handle_->Get( String::NewSymbol("onDrain") ); int backlog = 1024; // default value
if (!ondrain_value->IsFunction()) return; Local<Value> backlog_v = options->Get(String::NewSymbol("backlog"));
Handle<Function> ondrain = Handle<Function>::Cast(ondrain_value); if (backlog_v->IsInt32()) {
backlog = backlog_v->IntegerValue();
TryCatch try_catch; }
Handle<Value> r = ondrain->Call(socket->handle_, 0, NULL); Local<Value> on_error_v = options->Get(ON_ERROR_SYMBOL);
if (on_error_v->IsFunction()) {
handle_->Set(ON_ERROR_SYMBOL, on_error_v);
}
if(try_catch.HasCaught()) oi_server_init(&server_, backlog);
fatal_exception(try_catch); server_.on_connection = Acceptor::_OnConnection;
server_.on_error = Acceptor::_OnError;
server_.data = this;
} }
Connection*
void Acceptor::OnConnection (struct sockaddr *addr, socklen_t len)
Socket::OnError (oi_socket *s, oi_error e)
{ {
Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Local<Value> protocol_v = handle_->GetHiddenValue(PROTOCOL_SYMBOL);
if (!protocol_v->IsFunction()) {
Close();
return NULL;
}
Local<Function> protocol = Local<Function>::Cast(protocol_v);
Handle<Value> onerror_value = socket->handle_->Get( String::NewSymbol("onError") ); Local<Object> connection_handle = Connection::NewInstance(protocol);
if (!onerror_value->IsFunction()) return;
Handle<Function> onerror = Handle<Function>::Cast(onerror_value);
TryCatch try_catch; Connection *connection = new Connection(connection_handle);
return connection;
}
Handle<Value> r = onerror->Call(socket->handle_, 0, NULL); void
Acceptor::OnError (struct oi_error error)
{
HandleScope scope;
if(try_catch.HasCaught()) Local<Value> callback_v = handle_->Get(String::NewSymbol("onError"));
fatal_exception(try_catch); if (!callback_v->IsFunction()) return;
Local<Function> callback = Local<Function>::Cast(callback_v);
callback->Call(handle_, 0, NULL); // TODO args
} }
void Handle<Value>
Socket::OnTimeout (oi_socket *s) Acceptor::v8New (const Arguments& args)
{ {
Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> ontimeout_value = socket->handle_->Get( String::NewSymbol("onTimeout") ); if (args.Length() < 1 || args[0]->IsFunction() == false)
if (!ontimeout_value->IsFunction()) return; return ThrowException(String::New("Must at give connection handler as the first argument"));
Handle<Function> ontimeout = Handle<Function>::Cast(ontimeout_value);
TryCatch try_catch; /// set the handler
args.Holder()->SetHiddenValue(PROTOCOL_SYMBOL, args[0]);
Handle<Value> r = ontimeout->Call(socket->handle_, 0, NULL); Local<Object> options;
if(try_catch.HasCaught()) if (args.Length() > 1 && args[1]->IsObject()) {
fatal_exception(try_catch); options = args[1]->ToObject();
} } else {
options = Object::New();
}
void new Acceptor(args.Holder(), options);
Socket::Initialize (Handle<Object> target)
{
HandleScope scope;
Local<FunctionTemplate> template_local = FunctionTemplate::New(Socket::New); Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder());
socket_template = Persistent<FunctionTemplate>::New(template_local);
socket_template->InstanceTemplate()->SetInternalFieldCount(1);
target->Set(String::NewSymbol("Socket"), socket_template->GetFunction());
NODE_SET_METHOD(socket_template->InstanceTemplate(), "connectTCP", Socket::ConnectTCP); return args.This();
//NODE_SET_METHOD(socket_template->InstanceTemplate(), "connectUNIX", Socket::ConnectUNIX);
NODE_SET_METHOD(socket_template->InstanceTemplate(), "write", Socket::Write);
NODE_SET_METHOD(socket_template->InstanceTemplate(), "close", Socket::Close);
NODE_SET_METHOD(socket_template->InstanceTemplate(), "setEncoding", Socket::SetEncoding);
} }
void Handle<Value>
Server::Initialize (Handle<Object> target) Acceptor::v8Listen (const Arguments& args)
{ {
Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder());
if (args.Length() < 1)
return ThrowException(String::New("Must give at least a port as argument."));
char *host = NULL;
HandleScope scope; HandleScope scope;
String::AsciiValue port(args[0]->ToString());
if (args[1]->IsString()) {
String::Utf8Value host_sv(args[1]->ToString());
host = *host_sv;
}
Local<FunctionTemplate> server_template = FunctionTemplate::New(Server::New); // For servers call getaddrinfo inline. This is blocking but it shouldn't
server_template->InstanceTemplate()->SetInternalFieldCount(1); // matter--ever. If someone actually complains then simply swap it out
target->Set(String::NewSymbol("Server"), server_template->GetFunction()); // with a libeio call.
struct addrinfo *address = NULL;
int r = getaddrinfo(host, *port, &tcp_hints, &address);
if (r != 0)
return ThrowException(String::New(strerror(errno)));
NODE_SET_METHOD(server_template->InstanceTemplate(), "listenTCP", Server::ListenTCP); acceptor->Listen(address);
NODE_SET_METHOD(server_template->InstanceTemplate(), "close", Server::Close); return Undefined();
} }
Handle<Value>
Acceptor::v8Close (const Arguments& args)
{
Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder());
acceptor->Close();
return Undefined();
}

138
src/net.h

@ -7,59 +7,127 @@
namespace node { namespace node {
class Server : ObjectWrap { class Connection : public ObjectWrap {
public: public:
Server (v8::Handle<v8::Object> handle, int backlog); static void Initialize (v8::Handle<v8::Object> target);
~Server ();
static v8::Handle<v8::Value> New (const v8::Arguments& args); Connection (v8::Handle<v8::Object> handle);
static v8::Handle<v8::Value> ListenTCP (const v8::Arguments& args); ~Connection () {
static v8::Handle<v8::Value> Close (const v8::Arguments& args); Disconnect();
}
static void Initialize (v8::Handle<v8::Object> target); int Connect (struct addrinfo *address) {
private: return oi_socket_connect (&socket_, address);
static oi_socket* OnConnection (oi_server *, struct sockaddr *, socklen_t); }
oi_server server_;
};
static v8::Persistent<v8::FunctionTemplate> socket_template; void Send (oi_buf *buf) {
oi_socket_write (&socket_, buf);
}
class Socket : ObjectWrap { void Disconnect () {
public: oi_socket_close (&socket_);
Socket (v8::Handle<v8::Object> handle, double timeout); }
~Socket ();
// also a constructor
static Socket* NewConnection (double timeout);
protected:
static v8::Handle<v8::Value> v8New (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Connect (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Send (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Disconnect (const v8::Arguments& args);
void SetEncoding (v8::Handle<v8::Value>); void OnConnect (void);
void SetTimeout (double); void OnReceive (const void *buf, size_t len);
void OnDrain (void);
void OnDisconnect (void);
void OnError (oi_error e);
void OnTimeout (void);
static v8::Handle<v8::Value> New (const v8::Arguments& args); v8::Local<v8::Object> GetProtocol (void);
static v8::Handle<v8::Value> Write (const v8::Arguments& args); static v8::Local<v8::Object> NewInstance (v8::Local<v8::Function> protocol);
static v8::Handle<v8::Value> Close (const v8::Arguments& args);
static v8::Handle<v8::Value> ConnectTCP (const v8::Arguments& args);
static v8::Handle<v8::Value> SetEncoding (const v8::Arguments& args);
static void Initialize (v8::Handle<v8::Object> target);
private: private:
static void OnConnect (oi_socket *socket); static void _OnConnect (oi_socket *s) {
static void OnRead (oi_socket *s, const void *buf, size_t count); Connection *connection = static_cast<Connection*> (s->data);
static void OnDrain (oi_socket *s); connection->OnConnect();
static void OnError (oi_socket *s, oi_error e); }
static void OnClose (oi_socket *s);
static void OnTimeout (oi_socket *s); static void _OnReceive (oi_socket *s, const void *buf, size_t len) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnReceive(buf, len);
}
static void _OnDrain (oi_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDrain();
}
static void _OnError (oi_socket *s, oi_error e) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnError(e);
}
static void _OnDisconnect (oi_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDisconnect();
}
static void _OnTimeout (oi_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnTimeout();
}
static int Resolve (eio_req *req); static int Resolve (eio_req *req);
static int AfterResolve (eio_req *req); static int AfterResolve (eio_req *req);
enum encoding encoding_; enum encoding encoding_;
oi_socket socket_;
char *host_; char *host_;
char *port_; char *port_;
oi_socket socket_;
friend class Acceptor;
};
friend class Server; class Acceptor : public ObjectWrap {
public:
static void Initialize (v8::Handle<v8::Object> target);
Acceptor (v8::Handle<v8::Object> handle, v8::Handle<v8::Object> options);
~Acceptor () { Close(); }
int Listen (struct addrinfo *address) {
int r = oi_server_listen (&server_, address);
if(r != 0) return r;
oi_server_attach (EV_DEFAULT_ &server_);
return 0;
}
void Close ( ) {
oi_server_close (&server_);
}
protected:
Connection* OnConnection (struct sockaddr *addr, socklen_t len);
void OnError (struct oi_error error);
static v8::Handle<v8::Value> v8New (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Listen (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Close (const v8::Arguments& args);
private:
static oi_socket* _OnConnection (oi_server *s, struct sockaddr *addr, socklen_t len) {
Acceptor *acceptor = static_cast<Acceptor*> (s->data);
Connection *connection = acceptor->OnConnection (addr, len);
if (connection)
return &connection->socket_;
else
return NULL;
}
static void _OnError (oi_server *s, struct oi_error error) {
Acceptor *acceptor = static_cast<Acceptor*> (s->data);
acceptor->OnError (error);
}
oi_server server_;
}; };
} // namespace node } // namespace node

28
src/node.cc

@ -20,23 +20,26 @@ using namespace std;
static int exit_code = 0; static int exit_code = 0;
ObjectWrap::~ObjectWrap ( )
{
if (!handle_.IsEmpty()) {
handle_->SetInternalField(0, Undefined());
handle_.Dispose();
handle_.Clear();
}
}
ObjectWrap::ObjectWrap (Handle<Object> handle) ObjectWrap::ObjectWrap (Handle<Object> handle)
{ {
v8::HandleScope scope; // TODO throw exception if it's already set
handle_ = v8::Persistent<v8::Object>::New(handle); HandleScope scope;
handle_ = Persistent<Object>::New(handle);
v8::Handle<v8::External> external = v8::External::New(this); Handle<External> external = External::New(this);
handle_->SetInternalField(0, external); handle_->SetInternalField(0, external);
handle_.MakeWeak(this, ObjectWrap::MakeWeak); handle_.MakeWeak(this, ObjectWrap::MakeWeak);
} }
ObjectWrap::~ObjectWrap ( )
{
handle_->SetInternalField(0, Undefined());
handle_.Dispose();
handle_.Clear();
}
void* void*
ObjectWrap::Unwrap (v8::Handle<v8::Object> handle) ObjectWrap::Unwrap (v8::Handle<v8::Object> handle)
{ {
@ -56,7 +59,6 @@ ObjectWrap::MakeWeak (Persistent<Value> _, void *data)
// Extracts a C string from a V8 Utf8Value. // Extracts a C string from a V8 Utf8Value.
const char* const char*
ToCString(const v8::String::Utf8Value& value) ToCString(const v8::String::Utf8Value& value)
@ -246,8 +248,8 @@ main (int argc, char *argv[])
// BUILT-IN MODULES // BUILT-IN MODULES
Socket::Initialize(g); Acceptor::Initialize(g);
Server::Initialize(g); Connection::Initialize(g);
node::Init_timer(g); node::Init_timer(g);
node::Init_file(g); node::Init_file(g);
node::Init_http(g); node::Init_http(g);

77
test/test-pingpong.js

@ -1,52 +1,63 @@
include("mjsunit"); include("mjsunit");
var port = 12123;
var N = 100; var N = 100;
function onLoad() { var count = 0;
server = new Server(1024);
var count = 0; var server;
server.listenTCP(12123, function (connection) {
puts("got connection."); function Ponger (socket) {
connection.onRead = function (data) { this.encoding = "UTF8";
assertTrue(count <= N);
if (data === null) { this.onConnect = function () {
server.close(); puts("got socket.");
connection.close(); };
return;
} this.onReceive = function (data) {
stdout.print ("-"); assertTrue(count <= N);
if (/QUIT/.exec(data)) { stdout.print ("-");
server.close(); if (/QUIT/.exec(data)) {
connection.close(); socket.disconnect();
} else if (/PING/.exec(data)) { //server.close();
connection.write("PONG"); } else if (/PING/.exec(data)) {
} socket.send("PONG");
} }
}); };
}
function Pinger (socket) {
this.encoding = "UTF8";
socket = new Socket; this.onConnect = function () {
socket.onRead = function (data) { socket.send("PING");
stdout.print ("."); };
this.onReceive = function (data) {
stdout.print(".");
assertEquals("PONG", data); assertEquals("PONG", data);
setTimeout(function() { setTimeout(function() {
count += 1; count += 1;
if (count < N) { if (count < N) {
socket.write("PING"); socket.send("PING");
} else { } else {
stdout.write ("\n"); stdout.write("\n");
socket.write("QUIT\n"); socket.send("QUIT\n");
socket.close(); socket.disconnect();
} }
}, 10); }, 10);
}; };
socket.onClose = function () { this.onDisconnect = function () {
puts("socket close."); puts("socket close.");
assertEquals(N, count); assertEquals(N, count);
server.close();
}; };
}
socket.connectTCP(12123, "127.0.0.1", function (status) { function onLoad() {
if(status != 0) server = new TCPServer(Ponger);
exit(1); server.listen(port);
socket.write("PING"); var client = new TCPConnection(Pinger);
}); client.connect(port);
} }

Loading…
Cancel
Save