Browse Source

Further expand EventEmitter to TCP and HTTP

The constructor for TCP servers can no longer take a connection handler for
purely technical reasons. (The constructor for EventEmitter is implemented
in C++ but addListener is in javascript, and I don't want to make too many
C++ -> Javascript references.) Thus I introduce new constructor methods to
ease the creation of the servers:

  node.tcp.createServer()
  node.http.createServer()

These work almost the same as the old constructors.

In general we're working towards a future where no constructors are
publicly exposed or take arguments.

The HTTP events like "on_uri" are not yet using the event interface.
onMessage still is a constructor - but this will change soon.
v0.7.4-release
Ryan 16 years ago
parent
commit
ed3d6a63d5
  1. 3
      src/events.cc
  2. 16
      src/http.cc
  3. 7
      src/http.h
  4. 166
      src/http.js
  5. 66
      src/net.cc
  6. 20
      src/net.h
  7. 2
      src/node.cc
  8. 2
      test/mjsunit/test-http-cat.js
  9. 2
      test/mjsunit/test-http-client-race.js
  10. 4
      test/mjsunit/test-http-proxy.js
  11. 2
      test/mjsunit/test-http-server.js
  12. 2
      test/mjsunit/test-http.js
  13. 2
      test/mjsunit/test-node-cat.js
  14. 2
      test/mjsunit/test-reconnecting-socket.js
  15. 2
      test/mjsunit/test-remote-module-loading.js
  16. 2
      test/mjsunit/test-tcp-pingpong.js

3
src/events.cc

@ -26,7 +26,8 @@ EventEmitter::Initialize (v8::Handle<v8::Object> target)
// All prototype methods are defined in events.js
target->Set(String::NewSymbol("EventEmitter"), constructor_template->GetFunction());
target->Set(String::NewSymbol("EventEmitter"),
constructor_template->GetFunction());
}
bool

16
src/http.cc

@ -301,7 +301,7 @@ HTTPServer::Initialize (Handle<Object> target)
Local<FunctionTemplate> t = FunctionTemplate::New(New);
constructor_template = Persistent<FunctionTemplate>::New(t);
constructor_template->Inherit(Acceptor::constructor_template);
constructor_template->Inherit(Server::constructor_template);
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
target->Set(String::NewSymbol("LowLevelServer"), constructor_template->GetFunction());
}
@ -311,19 +311,7 @@ HTTPServer::New (const Arguments& args)
{
HandleScope scope;
if (args.Length() < 1 || args[0]->IsFunction() == false)
return ThrowException(String::New("Must at give connection handler as the first argument"));
Local<Function> protocol_class = Local<Function>::Cast(args[0]);
Local<Object> options;
if (args.Length() > 1 && args[1]->IsObject()) {
options = args[1]->ToObject();
} else {
options = Object::New();
}
HTTPServer *s = new HTTPServer(args.This(), protocol_class, options);
HTTPServer *s = new HTTPServer(args.This());
ObjectWrap::InformV8ofAllocation(s);
return args.This();

7
src/http.h

@ -38,7 +38,7 @@ protected:
friend class HTTPServer;
};
class HTTPServer : public Acceptor {
class HTTPServer : public Server {
public:
static void Initialize (v8::Handle<v8::Object> target);
static v8::Persistent<v8::FunctionTemplate> constructor_template;
@ -48,10 +48,7 @@ public:
protected:
static v8::Handle<v8::Value> New (const v8::Arguments& args);
HTTPServer (v8::Handle<v8::Object> handle,
v8::Handle<v8::Function> protocol_class,
v8::Handle<v8::Object> options)
: Acceptor(handle, protocol_class, options) {}
HTTPServer (v8::Handle<v8::Object> handle) : Server(handle) {}
v8::Handle<v8::FunctionTemplate> GetConnectionTemplate (void);
Connection* UnwrapConnection (v8::Local<v8::Object> connection);

166
src/http.js

@ -229,93 +229,93 @@ node.http.ServerResponse = function (connection, responses) {
/* This is a wrapper around the LowLevelServer interface. It provides
* connection handling, overflow checking, and some data buffering.
*/
node.http.Server = function (RequestHandler, options) {
if (!(this instanceof node.http.Server))
throw Error("Constructor called as a function");
var server = this;
function ConnectionHandler (connection) {
// An array of responses for each connection. In pipelined connections
// we need to keep track of the order they were sent.
var responses = [];
connection.onMessage = function ( ) {
var interrupted = false;
// filled in ...
var req = { method : null // at onHeadersComplete
, uri : "" // at onURI
, httpVersion : null // at onHeadersComplete
, headers : [] // at onHeaderField, onHeaderValue
, onBody : null // by user
, onBodyComplete : null // by user
, interrupt : function ( ) { interrupted = true; }
, setBodyEncoding : function (enc) {
connection.setEncoding(enc);
}
};
var res = new node.http.ServerResponse(connection, responses);
this.onURI = function (data) {
req.uri += data;
return !interrupted;
};
var last_was_value = false;
var headers = req.headers;
this.onHeaderField = function (data) {
if (headers.length > 0 && last_was_value == false)
headers[headers.length-1][0] += data;
else
headers.push([data]);
last_was_value = false;
return !interrupted;
};
this.onHeaderValue = function (data) {
var last_pair = headers[headers.length-1];
if (last_pair.length == 1)
last_pair[1] = data;
else
last_pair[1] += data;
last_was_value = true;
return !interrupted;
};
this.onHeadersComplete = function () {
req.httpVersion = this.httpVersion;
req.method = this.method;
// TODO parse the URI lazily?
req.uri = node.http.parseUri(req.uri);
res.should_keep_alive = this.should_keep_alive;
RequestHandler.apply(server, [req, res]);
return !interrupted;
};
this.onBody = function (chunk) {
if (req.onBody) req.onBody(chunk);
return !interrupted;
};
this.onMessageComplete = function () {
if (req.onBodyComplete) req.onBodyComplete();
return !interrupted;
};
node.http.createServer = function (requestHandler, options) {
var server = new node.http.LowLevelServer();
server.addListener("Connection", node.http.connectionListener);
//server.setOptions(options);
server.requestHandler = requestHandler;
return server;
};
node.http.connectionListener = function (connection) {
// An array of responses for each connection. In pipelined connections
// we need to keep track of the order they were sent.
var responses = [];
// is this really needed?
connection.addListener("EOF", function () {
if (responses.length == 0) {
connection.close();
} else {
responses[responses.length-1].closeOnFinish = true;
}
});
connection.onMessage = function () {
var interrupted = false;
// filled in ...
var req = { method : null // at onHeadersComplete
, uri : "" // at onURI
, httpVersion : null // at onHeadersComplete
, headers : [] // at onHeaderField, onHeaderValue
, onBody : null // by user
, onBodyComplete : null // by user
, interrupt : function ( ) { interrupted = true; }
, setBodyEncoding : function (enc) {
connection.setEncoding(enc);
}
};
var res = new node.http.ServerResponse(connection, responses);
this.onURI = function (data) {
req.uri += data;
return !interrupted;
};
// is this really needed?
connection.onEOF = function () {
if (responses.length == 0) {
connection.close();
} else {
responses[responses.length-1].closeOnFinish = true;
}
var last_was_value = false;
var headers = req.headers;
this.onHeaderField = function (data) {
if (headers.length > 0 && last_was_value == false)
headers[headers.length-1][0] += data;
else
headers.push([data]);
last_was_value = false;
return !interrupted;
};
}
this.__proto__ = new node.http.LowLevelServer(ConnectionHandler, options);
this.onHeaderValue = function (data) {
var last_pair = headers[headers.length-1];
if (last_pair.length == 1)
last_pair[1] = data;
else
last_pair[1] += data;
last_was_value = true;
return !interrupted;
};
this.onHeadersComplete = function () {
req.httpVersion = this.httpVersion;
req.method = this.method;
// TODO parse the URI lazily?
req.uri = node.http.parseUri(req.uri);
res.should_keep_alive = this.should_keep_alive;
connection.server.requestHandler.apply(connection.server, [req, res]);
return !interrupted;
};
this.onBody = function (chunk) {
if (req.onBody) req.onBody(chunk);
return !interrupted;
};
this.onMessageComplete = function () {
if (req.onBodyComplete) req.onBodyComplete();
return !interrupted;
};
};
};
node.http.Client = function (port, host) {

66
src/net.cc

@ -132,11 +132,9 @@ Connection::~Connection ()
}
void
Connection::SetAcceptor (Handle<Object> acceptor_handle)
Connection::SetServer (Handle<Object> server_handle)
{
HandleScope scope;
handle_->Set(SERVER_SYMBOL, acceptor_handle);
Attach();
}
Handle<Value>
@ -480,10 +478,10 @@ DEFINE_SIMPLE_CALLBACK(Connection::OnDrain, "Drain")
DEFINE_SIMPLE_CALLBACK(Connection::OnTimeout, "Timeout")
DEFINE_SIMPLE_CALLBACK(Connection::OnEOF, "EOF")
Persistent<FunctionTemplate> Acceptor::constructor_template;
Persistent<FunctionTemplate> Server::constructor_template;
void
Acceptor::Initialize (Handle<Object> target)
Server::Initialize (Handle<Object> target)
{
HandleScope scope;
@ -498,12 +496,13 @@ Acceptor::Initialize (Handle<Object> target)
target->Set(String::NewSymbol("Server"), constructor_template->GetFunction());
}
Acceptor::Acceptor (Handle<Object> handle, Handle<Function> connection_handler, Handle<Object> options)
Server::Server (Handle<Object> handle)
: EventEmitter(handle)
{
HandleScope scope;
#if 0
// TODO SetOptions
handle_->SetHiddenValue(CONNECTION_HANDLER_SYMBOL, connection_handler);
int backlog = 1024; // default value
@ -514,12 +513,12 @@ Acceptor::Acceptor (Handle<Object> handle, Handle<Function> connection_handler,
#endif
oi_server_init(&server_, 1024);
server_.on_connection = Acceptor::on_connection;
server_.on_connection = Server::on_connection;
server_.data = this;
}
static void
SetRemoteAddress (Local<Object> connection_handle, struct sockaddr *addr)
static Local<String>
GetAddressString (struct sockaddr *addr)
{
HandleScope scope;
char ip[INET6_ADDRSTRLEN];
@ -537,24 +536,24 @@ SetRemoteAddress (Local<Object> connection_handle, struct sockaddr *addr)
} else assert(0 && "received a bad sa_family");
connection_handle->Set(REMOTE_ADDRESS_SYMBOL, remote_address);
return scope.Close(remote_address);
}
Handle<FunctionTemplate>
Acceptor::GetConnectionTemplate (void)
Server::GetConnectionTemplate (void)
{
return Connection::constructor_template;
}
Connection*
Acceptor::UnwrapConnection (Local<Object> connection)
Server::UnwrapConnection (Local<Object> connection)
{
HandleScope scope;
return NODE_UNWRAP(Connection, connection);
}
Connection*
Acceptor::OnConnection (struct sockaddr *addr, socklen_t len)
Server::OnConnection (struct sockaddr *addr, socklen_t len)
{
HandleScope scope;
@ -568,12 +567,14 @@ Acceptor::OnConnection (struct sockaddr *addr, socklen_t len)
return NULL;
}
SetRemoteAddress(js_connection, addr);
Local<String> remote_address = GetAddressString(addr);
js_connection->Set(REMOTE_ADDRESS_SYMBOL, remote_address);
js_connection->Set(SERVER_SYMBOL, handle_);
Connection *connection = UnwrapConnection(js_connection);
if (!connection) return NULL;
connection->SetAcceptor(handle_);
connection->Attach();
Handle<Value> argv[1] = { js_connection };
@ -582,34 +583,25 @@ Acceptor::OnConnection (struct sockaddr *addr, socklen_t len)
return connection;
}
// TODO Server->SetOptions
// TODO Server -> Server rename
Handle<Value>
Acceptor::New (const Arguments& args)
Server::New (const Arguments& args)
{
HandleScope scope;
if (args.Length() < 1 || args[0]->IsFunction() == false)
return ThrowException(String::New("Must at give connection handler as the first argument"));
Local<Function> connection_handler = Local<Function>::Cast(args[0]);
Local<Object> options;
if (args.Length() > 1 && args[1]->IsObject()) {
options = args[1]->ToObject();
} else {
options = Object::New();
}
Acceptor *a = new Acceptor(args.This(), connection_handler, options);
Server *a = new Server(args.This());
ObjectWrap::InformV8ofAllocation(a);
return args.This();
}
Handle<Value>
Acceptor::Listen (const Arguments& args)
Server::Listen (const Arguments& args)
{
Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder());
if (!acceptor) return Handle<Value>();
Server *server = NODE_UNWRAP(Server, args.Holder());
if (!server) return Handle<Value>();
if (args.Length() == 0)
return ThrowException(String::New("Must give at least a port as argument."));
@ -635,7 +627,7 @@ Acceptor::Listen (const Arguments& args)
address = AddressDefaultToIPv4(address_list);
acceptor->Listen(address);
server->Listen(address);
if (address_list) freeaddrinfo(address_list);
@ -643,11 +635,11 @@ Acceptor::Listen (const Arguments& args)
}
Handle<Value>
Acceptor::Close (const Arguments& args)
Server::Close (const Arguments& args)
{
Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder());
if (!acceptor) return Handle<Value>();
Server *server = NODE_UNWRAP(Server, args.Holder());
if (!server) return Handle<Value>();
acceptor->Close();
server->Close();
return Undefined();
}

20
src/net.h

@ -8,7 +8,7 @@
namespace node {
class Acceptor;
class Server;
class Connection : public EventEmitter {
public:
@ -42,7 +42,7 @@ protected:
void FullClose (void) { oi_socket_full_close(&socket_); }
void ForceClose (void) { oi_socket_force_close(&socket_); }
void SetAcceptor (v8::Handle<v8::Object> acceptor_handle);
void SetServer (v8::Handle<v8::Object> server_handle);
virtual void OnConnect (void);
virtual void OnReceive (const void *buf, size_t len);
@ -105,14 +105,14 @@ private:
char *port_;
oi_socket socket_;
friend class Acceptor;
friend class Server;
};
class Acceptor : public EventEmitter {
class Server : public EventEmitter {
public:
static void Initialize (v8::Handle<v8::Object> target);
virtual size_t size (void) { return sizeof(Acceptor); };
virtual size_t size (void) { return sizeof(Server); };
protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template;
@ -120,10 +120,8 @@ protected:
static v8::Handle<v8::Value> Listen (const v8::Arguments& args);
static v8::Handle<v8::Value> Close (const v8::Arguments& args);
Acceptor (v8::Handle<v8::Object> handle,
v8::Handle<v8::Function> connection_handler,
v8::Handle<v8::Object> options);
virtual ~Acceptor () { Close(); }
Server (v8::Handle<v8::Object> handle);
virtual ~Server () { Close(); }
int Listen (struct addrinfo *address) {
int r = oi_server_listen (&server_, address);
@ -144,8 +142,8 @@ protected:
private:
Connection* OnConnection (struct sockaddr *addr, socklen_t len);
static oi_socket* on_connection (oi_server *s, struct sockaddr *addr, socklen_t len) {
Acceptor *acceptor = static_cast<Acceptor*> (s->data);
Connection *connection = acceptor->OnConnection (addr, len);
Server *server = static_cast<Server*> (s->data);
Connection *connection = server->OnConnection (addr, len);
return &connection->socket_;
}

2
src/node.cc

@ -325,7 +325,7 @@ Load (int argc, char *argv[])
Local<Object> tcp = Object::New();
node_obj->Set(String::New("tcp"), tcp);
Acceptor::Initialize(tcp);
Server::Initialize(tcp);
Connection::Initialize(tcp);
Local<Object> http = Object::New();

2
test/mjsunit/test-http-cat.js

@ -2,7 +2,7 @@ include("mjsunit.js");
PORT = 8888;
var body = "exports.A = function() { return 'A';}";
var server = new node.http.Server(function (req, res) {
var server = node.http.createServer(function (req, res) {
res.sendHeader(200, [
["Content-Length", body.length],
["Content-Type", "text/plain"]

2
test/mjsunit/test-http-client-race.js

@ -4,7 +4,7 @@ PORT = 8888;
var body1_s = "1111111111111111";
var body2_s = "22222";
var server = new node.http.Server(function (req, res) {
var server = node.http.createServer(function (req, res) {
var body = req.uri.path === "/1" ? body1_s : body2_s;
res.sendHeader(200, [
["Content-Type", "text/plain"],

4
test/mjsunit/test-http-proxy.js

@ -3,7 +3,7 @@ include("mjsunit.js");
var PROXY_PORT = 8869;
var BACKEND_PORT = 8870;
var backend = new node.http.Server(function (req, res) {
var backend = node.http.createServer(function (req, res) {
// node.debug("backend");
res.sendHeader(200, [["content-type", "text/plain"]]);
res.sendBody("hello world\n");
@ -13,7 +13,7 @@ var backend = new node.http.Server(function (req, res) {
backend.listen(BACKEND_PORT);
var proxy_client = new node.http.Client(BACKEND_PORT);
var proxy = new node.http.Server(function (req, res) {
var proxy = node.http.createServer(function (req, res) {
// node.debug("proxy req");
var proxy_req = proxy_client.get(req.uri.path);
proxy_req.finish(function(proxy_res) {

2
test/mjsunit/test-http-server.js

@ -9,7 +9,7 @@ var client_got_eof = false;
function onLoad() {
new node.http.Server(function (req, res) {
node.http.createServer(function (req, res) {
res.id = request_number;
req.id = request_number++;

2
test/mjsunit/test-http.js

@ -7,7 +7,7 @@ var body0 = "";
var body1 = "";
function onLoad () {
new node.http.Server(function (req, res) {
node.http.createServer(function (req, res) {
if (responses_sent == 0) {
assertEquals("GET", req.method);
assertEquals("/hello", req.uri.path);

2
test/mjsunit/test-node-cat.js

@ -2,7 +2,7 @@ include("mjsunit.js");
PORT = 8888;
var body = "exports.A = function() { return 'A';}";
var server = new node.http.Server(function (req, res) {
var server = node.http.createServer(function (req, res) {
res.sendHeader(200, [
["Content-Length", body.length],
["Content-Type", "text/plain"]

2
test/mjsunit/test-reconnecting-socket.js

@ -7,7 +7,7 @@ var client_recv_count = 0;
function onLoad () {
var server = new node.tcp.Server(function (socket) {
var server = node.tcp.createServer(function (socket) {
socket.onConnect = function () {
socket.send("hello\r\n");
};

2
test/mjsunit/test-remote-module-loading.js

@ -1,4 +1,4 @@
var s = new node.http.Server(function (req, res) {
var s = node.http.createServer(function (req, res) {
var body = "exports.A = function() { return 'A';}";
res.sendHeader(200, [
["Content-Length", body.length],

2
test/mjsunit/test-tcp-pingpong.js

@ -8,7 +8,7 @@ function pingPongTest (port, host, on_complete) {
var count = 0;
var sent_final_ping = false;
var server = new node.tcp.Server(function (socket) {
var server = node.tcp.createServer(function (socket) {
assertTrue(socket.remoteAddress !== null);
assertTrue(socket.remoteAddress !== undefined);
if (host === "127.0.0.1")

Loading…
Cancel
Save