Browse Source

Implement half-closed streams

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
0618f02f6f
  1. 302
      lib/net.js
  2. 4
      src/node_net2.cc
  3. 19
      test-net-server.js

302
lib/net.js

@ -1,36 +1,37 @@
var debugLevel = 0; var debugLevel = 0;
if ("NODE_DEBUG" in process.ENV) debugLevel = 1; if ('NODE_DEBUG' in process.ENV) debugLevel = 1;
function debug (x) { function debug (x) {
if (debugLevel > 0) { if (debugLevel > 0) {
process.stdio.writeError(x + "\n"); process.stdio.writeError(x + '\n');
} }
} }
var socket = process.socket;
var bind = process.bind; var socket = process.socket;
var listen = process.listen; var bind = process.bind;
var accept = process.accept; var connect = process.connect;
var close = process.close; var listen = process.listen;
var shutdown = process.shutdown; var accept = process.accept;
var read = process.read; var close = process.close;
var write = process.write; var shutdown = process.shutdown;
var toRead = process.toRead; var read = process.read;
var write = process.write;
var Stream = function (peerInfo) { var toRead = process.toRead;
var socketError = process.socketError;
var EINPROGRESS = process.EINPROGRESS;
function Stream (peerInfo) {
process.EventEmitter.call(); process.EventEmitter.call();
var self = this; var self = this;
self.fd = peerInfo.fd;
self.remoteAddress = peerInfo.remoteAddress;
self.remotePort = peerInfo.remotePort;
// Allocated on demand. // Allocated on demand.
self.recvBuffer = null; self.recvBuffer = null;
self.sendQueue = []; self.sendQueue = [];
self.readWatcher = new process.IOWatcher(function () { self.readWatcher = new process.IOWatcher(function () {
debug("\n" + self.fd + " readable"); debug('\n' + self.fd + ' readable');
// If this is the first recv (recvBuffer doesn't exist) or we've used up // If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one. // most of the recvBuffer, allocate a new one.
@ -39,36 +40,50 @@ var Stream = function (peerInfo) {
self._allocateNewRecvBuf(); self._allocateNewRecvBuf();
} }
debug("recvBuffer.used " + self.recvBuffer.used); debug('recvBuffer.used ' + self.recvBuffer.used);
var bytesRead = read(self.fd, var bytesRead = read(self.fd,
self.recvBuffer, self.recvBuffer,
self.recvBuffer.used, self.recvBuffer.used,
self.recvBuffer.length - self.recvBuffer.used); self.recvBuffer.length - self.recvBuffer.used);
debug("bytesRead " + bytesRead + "\n"); debug('bytesRead ' + bytesRead + '\n');
if (bytesRead == 0) { if (bytesRead == 0) {
self.readable = false; self.readable = false;
self.readWatcher.stop(); self.readWatcher.stop();
self.emit("eof"); self.emit('eof');
if (!self.writable) self.forceClose();
} else { } else {
var slice = self.recvBuffer.slice(self.recvBuffer.used, var slice = self.recvBuffer.slice(self.recvBuffer.used,
self.recvBuffer.used + bytesRead); self.recvBuffer.used + bytesRead);
self.recvBuffer.used += bytesRead; self.recvBuffer.used += bytesRead;
self.emit("receive", slice); self.emit('receive', slice);
} }
}); });
self.readWatcher.set(self.fd, true, false);
self.readWatcher.start();
self.writeWatcher = new process.IOWatcher(function () { self._onWriteFlush = function () {
self.flush(); self.flush();
}); };
self.writeWatcher.set(self.fd, false, true);
self.writeWatcher = new process.IOWatcher(self._onWriteFlush);
self.readable = false;
self.writable = false;
if (peerInfo) {
self.fd = peerInfo.fd;
self.remoteAddress = peerInfo.remoteAddress;
self.remotePort = peerInfo.remotePort;
self.readable = true; self.readWatcher.set(self.fd, true, false);
self.writable = true; self.readWatcher.start();
self.writeWatcher.set(self.fd, false, true);
self.readable = true;
self.writable = true;
}
}; };
process.inherits(Stream, process.EventEmitter); process.inherits(Stream, process.EventEmitter);
exports.Stream = Stream;
Stream.prototype._allocateNewRecvBuf = function () { Stream.prototype._allocateNewRecvBuf = function () {
var self = this; var self = this;
@ -95,6 +110,7 @@ Stream.prototype._allocateNewRecvBuf = function () {
self.recvBuffer.used = 0; self.recvBuffer.used = 0;
}; };
Stream.prototype._allocateSendBuffer = function () { Stream.prototype._allocateSendBuffer = function () {
var b = new process.Buffer(1024); var b = new process.Buffer(1024);
b.used = 0; b.used = 0;
@ -103,56 +119,62 @@ Stream.prototype._allocateSendBuffer = function () {
return b; return b;
}; };
Stream.prototype.send = function (data, encoding) {
Stream.prototype._sendString = function (data, encoding) {
var self = this; var self = this;
if (typeof(data) == "string") { var buffer;
var buffer; if (self.sendQueue.length == 0) {
if (self.sendQueue.length == 0) { buffer = self._allocateSendBuffer();
buffer = self._allocateSendBuffer(); } else {
} else { // walk through the sendQueue, find the buffer with free space
// walk through the sendQueue, find the first empty buffer for (var i = 0; i < self.sendQueue.length; i++) {
for (var i = 0; i < self.sendQueue.length; i++) { if (self.sendQueue[i].used == 0) {
if (self.sendQueue[i].used == 0) { buffer = self.sendQueue[i];
buffer = self.sendQueue[i]; break;
break;
}
}
// if we didn't find one, take the last
if (!buffer) {
buffer = self.sendQueue[self.sendQueue.length-1];
// if last buffer is empty
if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
} }
} }
// if we didn't find one, take the last
if (!buffer) {
buffer = self.sendQueue[self.sendQueue.length-1];
// if last buffer is used up
if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
}
}
encoding = encoding || "ascii"; // default to ascii since it's faster encoding = encoding || 'ascii'; // default to ascii since it's faster
var charsWritten; var charsWritten;
if (encoding.toLowerCase() == "utf8") { if (encoding.toLowerCase() == 'utf8') {
charsWritten = buffer.utf8Write(data, charsWritten = buffer.utf8Write(data,
buffer.used, buffer.used,
buffer.length - buffer.used); buffer.length - buffer.used);
buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten)); buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten));
} else { } else {
// ascii // ascii
charsWritten = buffer.asciiWrite(data, charsWritten = buffer.asciiWrite(data,
buffer.used, buffer.used,
buffer.length - buffer.used); buffer.length - buffer.used);
buffer.used += charsWritten; buffer.used += charsWritten;
debug("ascii charsWritten " + charsWritten); debug('ascii charsWritten ' + charsWritten);
debug("ascii buffer.used " + buffer.used); debug('ascii buffer.used ' + buffer.used);
} }
// If we didn't finish, then recurse with the rest of the string. // If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) { if (charsWritten < data.length) {
debug("recursive send"); debug('recursive send');
self.send(data.slice(charsWritten), encoding); self._sendString(data.slice(charsWritten), encoding);
} }
};
Stream.prototype.send = function (data, encoding) {
var self = this;
if (typeof(data) == 'string') {
self._sendString(data, encoding);
} else { } else {
// data is a process.Buffer // data is a process.Buffer
// walk through the sendQueue, find the first empty buffer // walk through the sendQueue, find the first empty buffer
var inserted = false; var inserted = false;
data.sent = 0; data.sent = 0;
@ -171,6 +193,7 @@ Stream.prototype.send = function (data, encoding) {
this.flush(); this.flush();
}; };
// returns true if flushed without getting EAGAIN // returns true if flushed without getting EAGAIN
// false if it got EAGAIN // false if it got EAGAIN
Stream.prototype.flush = function () { Stream.prototype.flush = function () {
@ -194,51 +217,124 @@ Stream.prototype.flush = function () {
return false; return false;
} }
b.sent += bytesWritten; b.sent += bytesWritten;
debug("bytes sent: " + b.sent); debug('bytes sent: ' + b.sent);
} }
this.writeWatcher.stop(); this.writeWatcher.stop();
return true; return true;
}; };
Stream.prototype.close = function () {
this.readable = false;
this.writable = false;
this.writeWatcher.stop(); // var stream = new Stream();
this.readWatcher.stop(); // stream.connect(80) - TCP connect to port 80 on the localhost
close(this.fd); // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org
debug("close peer " + this.fd); // stream.connect('/tmp/socket') - UNIX connect to socket specified by path
this.fd = null; Stream.prototype.connect = function () {
var self = this;
if (self.fd) throw new Error('Stream already opened');
if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
self.fd = process.socket('UNIX');
// TODO check if sockfile exists?
} else {
self.fd = process.socket('TCP');
// TODO dns resolution on arguments[1]
}
try {
connect(self.fd, arguments[0], arguments[1]);
} catch (e) {
close(self.fd);
throw e;
}
// Don't start the read watcher until connection is established
self.readWatcher.set(self.fd, true, false);
// How to connect on POSIX: Wait for fd to become writable, then call
// socketError() if there isn't an error, we're connected. AFAIK this a
// platform independent way determining when a non-blocking connection
// is established, but I have only seen it documented in the Linux
// Manual Page connect(2) under the error code EINPROGRESS.
self.writeWatcher.set(self.fd, false, true);
self.writeWatcher.start();
self.writeWatcher.callback = function () {
var errno = socketError(self.fd);
if (errno == 0) {
// connection established
self.emit('connect');
self.readWatcher.start();
self.readable = true;
self.writable = true;
self.writeWatcher.callback = self._onWriteFlush;
} else if (errno != EINPROGRESS) {
var e = new Error('connection error');
e.errno = errno;
self.readWatcher.stop();
self.writeWatcher.stop();
close(self.fd);
}
};
};
Stream.prototype.forceClose = function (exception) {
if (this.fd) {
this.readable = false;
this.writable = false;
this.writeWatcher.stop();
this.readWatcher.stop();
close(this.fd);
debug('close peer ' + this.fd);
this.fd = null;
this.emit('close', exception);
}
}; };
var Server = function (listener) {
Stream.prototype.close = function () {
if (this.readable && this.writable) {
this.writable = false;
shutdown(this.fd, "write");
} else if (!this.readable && this.writable) {
// already got EOF
this.forceClose(this.fd);
}
// In the case we've already shutdown write side,
// but haven't got EOF: ignore. In the case we're
// fully closed already: ignore.
};
function Server (listener) {
var self = this; var self = this;
if (listener) { if (listener) {
self.addListener("connection", listener); self.addListener('connection', listener);
} }
self.watcher = new process.IOWatcher(function (readable, writeable) { self.watcher = new process.IOWatcher(function (readable, writeable) {
while (self.fd) { while (self.fd) {
var peerInfo = accept(self.fd); var peerInfo = accept(self.fd);
debug("accept: " + JSON.stringify(peerInfo)); debug('accept: ' + JSON.stringify(peerInfo));
if (!peerInfo) return; if (!peerInfo) return;
var peer = new Stream(peerInfo); var peer = new Stream(peerInfo);
self.emit("connection", peer); self.emit('connection', peer);
} }
}); });
}; };
process.inherits(Server, process.EventEmitter); process.inherits(Server, process.EventEmitter);
exports.Server = Server;
Server.prototype.listen = function () { Server.prototype.listen = function () {
var self = this; var self = this;
if (self.fd) throw new Error('Server already opened');
if (self.fd) throw new Error("Already running");
var backlogIndex; var backlogIndex;
if (typeof(arguments[0]) == "string" && arguments.length == 1) { if (typeof(arguments[0]) == 'string' && arguments.length == 1) {
// the first argument specifies a path // the first argument specifies a path
self.fd = process.socket("UNIX"); self.fd = process.socket('UNIX');
// TODO unlink sockfile if exists? // TODO unlink sockfile if exists?
// if (lstat(SOCKFILE, &tstat) == 0) { // if (lstat(SOCKFILE, &tstat) == 0) {
// assert(S_ISSOCK(tstat.st_mode)); // assert(S_ISSOCK(tstat.st_mode));
@ -248,46 +344,24 @@ Server.prototype.listen = function () {
backlogIndex = 1; backlogIndex = 1;
} else { } else {
// the first argument is the port, the second an IP // the first argument is the port, the second an IP
self.fd = process.socket("TCP"); self.fd = process.socket('TCP');
// TODO dns resolution on arguments[1] // TODO dns resolution on arguments[1]
bind(self.fd, arguments[0], arguments[1]); bind(self.fd, arguments[0], arguments[1]);
backlogIndex = typeof(arguments[1]) == "string" ? 2 : 1; backlogIndex = typeof(arguments[1]) == 'string' ? 2 : 1;
} }
listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128); listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128);
self.emit("listening");
self.watcher.set(self.fd, true, false); self.watcher.set(self.fd, true, false);
self.watcher.start(); self.watcher.start();
}; };
Server.prototype.close = function () { Server.prototype.close = function () {
var self = this; var self = this;
if (!self.fd) throw new Error("Not running"); if (!self.fd) throw new Error('Not running');
self.watcher.stop(); self.watcher.stop();
close(self.fd); close(self.fd);
self.fd = null; self.fd = null;
}; };
///////////////////////////////////////////////////////
process.Buffer.prototype.toString = function () {
return this.utf8Slice(0, this.length);
};
var sys = require("sys");
var server = new Server(function (peer) {
sys.puts("connection (" + peer.fd + "): "
+ peer.remoteAddress
+ " port "
+ peer.remotePort
);
sys.puts("server fd: " + server.fd);
peer.addListener("receive", function (b) {
sys.puts("recv (" + b.length + "): " + b);
peer.send("pong\r\n");
});
});
//server.listen(8000);
server.listen(8000);
sys.puts("server fd: " + server.fd);

4
src/node_net2.cc

@ -377,7 +377,7 @@ static Handle<Value> Accept(const Arguments& args) {
} }
static Handle<Value> GetSocketError(const Arguments& args) { static Handle<Value> SocketError(const Arguments& args) {
HandleScope scope; HandleScope scope;
FD_ARG(args[0]) FD_ARG(args[0])
@ -515,7 +515,7 @@ void InitNet2(Handle<Object> target) {
NODE_SET_METHOD(target, "bind", Bind); NODE_SET_METHOD(target, "bind", Bind);
NODE_SET_METHOD(target, "listen", Listen); NODE_SET_METHOD(target, "listen", Listen);
NODE_SET_METHOD(target, "accept", Accept); NODE_SET_METHOD(target, "accept", Accept);
NODE_SET_METHOD(target, "getSocketError", GetSocketError); NODE_SET_METHOD(target, "socketError", SocketError);
NODE_SET_METHOD(target, "toRead", ToRead); NODE_SET_METHOD(target, "toRead", ToRead);

19
test-net-server.js

@ -18,6 +18,25 @@ var server = new net.Server(function (stream) {
stream.send(b); stream.send(b);
stream.send("pong utf8\r\n", "utf8"); stream.send("pong utf8\r\n", "utf8");
}); });
stream.addListener("eof", function () {
sys.puts("server peer eof");
stream.close();
});
}); });
server.listen(8000); server.listen(8000);
sys.puts("server fd: " + server.fd); sys.puts("server fd: " + server.fd);
var stream = new net.Stream();
stream.addListener('connect', function () {
sys.puts("!!!client connected");
stream.send("hello\n");
});
stream.addListener('receive', function (d) {
sys.puts("!!!client got: " + JSON.stringify(d.toString()));
});
stream.connect(8000);

Loading…
Cancel
Save