Browse Source

[net2] Better EOF marking, rename events

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
25700e65ee
  1. 51
      lib/net.js
  2. 2
      src/node_net2.cc
  3. 26
      test-net-server.js

51
lib/net.js

@ -24,6 +24,7 @@ var getsockname = process.getsockname;
var getaddrinfo = process.getaddrinfo; var getaddrinfo = process.getaddrinfo;
var needsLookup = process.needsLookup; var needsLookup = process.needsLookup;
var EINPROGRESS = process.EINPROGRESS; var EINPROGRESS = process.EINPROGRESS;
var END_OF_FILE = 42;
function Socket (peerInfo) { function Socket (peerInfo) {
@ -59,7 +60,7 @@ function Socket (peerInfo) {
var slice = self.recvBuffer.slice(self.recvBuffer.used, var slice = self.recvBuffer.slice(self.recvBuffer.used,
self.recvBuffer.used + bytesRead); self.recvBuffer.used + bytesRead);
self.recvBuffer.used += bytesRead; self.recvBuffer.used += bytesRead;
self.emit('receive', slice); self.emit('data', slice);
} }
}; };
self.readable = false; self.readable = false;
@ -153,7 +154,7 @@ Socket.prototype._sendString = function (data, encoding) {
} }
// if we didn't find one, take the last // if we didn't find one, take the last
if (!buffer) { if (!buffer) {
buffer = self.sendQueue[self.sendQueue.length-1]; buffer = self._sendQueueLast();
// if last buffer is used up // if last buffer is used up
if (buffer.length == buffer.used) buffer = self._allocateSendBuffer(); if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
} }
@ -191,12 +192,24 @@ Socket.prototype._sendString = function (data, encoding) {
}; };
Socket.prototype._sendQueueLast = function () {
return this.sendQueue.length > 0 ? this.sendQueue[this.sendQueue.length-1]
: null;
};
// Returns true if all the data was flushed to socket. Returns false if // Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will // something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket. // signal when it has been finally flushed to socket.
Socket.prototype.send = function (data, encoding) { Socket.prototype.send = function (data, encoding) {
var self = this; var self = this;
if (!self.writable) throw new Error('Socket is not writable'); if (!self.writable) throw new Error('Socket is not writable');
if (self._sendQueueLast == END_OF_FILE) {
throw new Error('socket.close() called already; cannot write.');
}
if (typeof(data) == 'string') { if (typeof(data) == 'string') {
self._sendString(data, encoding); self._sendString(data, encoding);
} else { } else {
@ -225,12 +238,18 @@ Socket.prototype.send = function (data, encoding) {
// Flushes the write buffer out. Emits "drain" if the buffer is empty. // Flushes the write buffer out. Emits "drain" if the buffer is empty.
Socket.prototype.flush = function () { Socket.prototype.flush = function () {
var self = this; var self = this;
if (!self.writable) throw new Error('Socket is not writable');
var bytesWritten; var bytesWritten;
while (self.sendQueue.length > 0) { while (self.sendQueue.length > 0) {
if (!self.writable) throw new Error('Socket is not writable');
var b = self.sendQueue[0]; var b = self.sendQueue[0];
if (b == END_OF_FILE) {
self._shutdown();
break;
}
if (b.sent == b.used) { if (b.sent == b.used) {
// this can be improved - save the buffer for later? // this can be improved - save the buffer for later?
self.sendQueue.shift() self.sendQueue.shift()
@ -315,7 +334,7 @@ Socket.prototype.forceClose = function (exception) {
this.writeWatcher.stop(); this.writeWatcher.stop();
this.readWatcher.stop(); this.readWatcher.stop();
close(this.fd); close(this.fd);
debug('close peer ' + this.fd); debug('close socket ' + this.fd);
this.fd = null; this.fd = null;
this.emit('close', exception); this.emit('close', exception);
} }
@ -325,30 +344,16 @@ Socket.prototype.forceClose = function (exception) {
Socket.prototype._shutdown = function () { Socket.prototype._shutdown = function () {
if (this.writable) { if (this.writable) {
this.writable = false; this.writable = false;
shutdown(this.fd, "write"); shutdown(this.fd, 'write');
} }
}; };
Socket.prototype.close = function () { Socket.prototype.close = function () {
var self = this; if (this.writable) {
var closeMethod; if (this._sendQueueLast() != END_OF_FILE) {
if (self.readable && self.writable) { this.sendQueue.push(END_OF_FILE);
closeMethod = self._shutdown; this.flush();
} else if (!self.readable && self.writable) {
// already got EOF
closeMethod = self.forceClose;
}
// In the case we've already shutdown write side,
// but haven't got EOF: ignore. In the case we're
// fully closed already: ignore.
if (closeMethod) {
if (self.sendQueueSize == 0) {
// no queue. just shut down the socket.
closeMethod();
} else {
self.addListener("drain", closeMethod);
} }
} }
}; };

2
src/node_net2.cc

@ -264,7 +264,7 @@ static Handle<Value> Shutdown(const Arguments& args) {
int how = SHUT_WR; int how = SHUT_WR;
if (args[1]->IsString()) { if (args[1]->IsString()) {
String::Utf8Value t(args[0]->ToString()); String::Utf8Value t(args[1]->ToString());
if (0 == strcasecmp(*t, "write")) { if (0 == strcasecmp(*t, "write")) {
how = SHUT_WR; how = SHUT_WR;
} else if (0 == strcasecmp(*t, "read")) { } else if (0 == strcasecmp(*t, "read")) {

26
test-net-server.js

@ -5,27 +5,27 @@ process.Buffer.prototype.toString = function () {
var sys = require("sys"); var sys = require("sys");
var net = require("./lib/net"); var net = require("./lib/net");
var server = new net.Server(function (stream) { var server = new net.Server(function (socket) {
sys.puts("connection (" + stream.fd + "): " sys.puts("connection (" + socket.fd + "): "
+ stream.remoteAddress + socket.remoteAddress
+ " port " + " port "
+ stream.remotePort + socket.remotePort
); );
sys.puts("server fd: " + server.fd); sys.puts("server fd: " + server.fd);
stream.addListener("receive", function (b) { socket.addListener("data", function (b) {
stream.send("pong ascii\r\n", "ascii"); socket.send("pong ascii\r\n", "ascii");
stream.send(b); socket.send(b);
stream.send("pong utf8\r\n", "utf8"); socket.send("pong utf8\r\n", "utf8");
}); });
stream.addListener('drain', function () { socket.addListener("eof", function () {
sys.puts("server-side socket drain"); sys.puts("server peer eof");
socket.close();
}); });
stream.addListener("eof", function () { socket.addListener('drain', function () {
sys.puts("server peer eof"); sys.puts("server-side socket drain");
stream.close();
}); });
}); });
server.listen(8000); server.listen(8000);

Loading…
Cancel
Save