Browse Source

Update net.js for new stream API

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
264a67aed2
  1. 87
      lib/net.js
  2. 2
      test/simple/test-net-fd-passing.js
  3. 8
      test/simple/test-net-pingpong.js

87
lib/net.js

@ -8,6 +8,7 @@ function debug (x) {
} }
var Buffer = process.Buffer;
var IOWatcher = process.IOWatcher; var IOWatcher = process.IOWatcher;
var assert = process.assert; var assert = process.assert;
var socket = process.socket; var socket = process.socket;
@ -61,13 +62,13 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
var nb = 0; var nb = 0;
var buffers = new FreeList("buffer", 100, function (l) { var buffers = new FreeList("buffer", 100, function (l) {
return new process.Buffer(500); return new Buffer(500);
}); });
// Allocated on demand. // Allocated on demand.
var recvBuffer = null; var recvBuffer = null;
function allocRecvBuffer () { function allocRecvBuffer () {
recvBuffer = new process.Buffer(40*1024); recvBuffer = new Buffer(40*1024);
recvBuffer.used = 0; recvBuffer.used = 0;
} }
@ -138,19 +139,20 @@ function initSocket (self) {
}; };
self.readable = false; self.readable = false;
self.sendQueue = []; // queue of buffers that need to be written to socket self._writeQueue = []; // queue of buffers that need to be written to socket
// XXX use link list? // XXX use link list?
self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length! self._writeQueueSize = 0; // in bytes, not to be confused with _writeQueue.length!
self.sendMessageQueueSize = 0; // number of messages remaining to be sent self._writeMessageQueueSize = 0; // number of messages remaining to be sent
self._doFlush = function () { self._doFlush = function () {
// Socket becomes writeable on connect() but don't flush if there's // Socket becomes writeable on connect() but don't flush if there's
// nothing actually to write // nothing actually to write
if ((self.sendQueueSize == 0) && (self.sendMessageQueueSize == 0)) { if ((self._writeQueueSize == 0) && (self._writeMessageQueueSize == 0)) {
return; return;
} }
if (self.flush()) { if (self.flush()) {
assert(self.sendQueueSize == 0); assert(self._writeQueueSize == 0);
assert(self.sendMessageQueueSize == 0); assert(self._writeMessageQueueSize == 0);
if (self._events && self._events['drain']) self.emit("drain"); if (self._events && self._events['drain']) self.emit("drain");
if (self.ondrain) self.ondrain(); // Optimization if (self.ondrain) self.ondrain(); // Optimization
@ -195,19 +197,19 @@ Socket.prototype._allocateSendBuffer = function () {
b.used = 0; b.used = 0;
b.sent = 0; b.sent = 0;
b.isMsg = false; b.isMsg = false;
this.sendQueue.push(b); this._writeQueue.push(b);
return b; return b;
}; };
Socket.prototype._sendString = function (data, encoding) { Socket.prototype._writeString = function (data, encoding) {
var self = this; var self = this;
if (!self.writable) throw new Error('Socket is not writable'); if (!self.writable) throw new Error('Socket is not writable');
var buffer; var buffer;
if (self.sendQueue.length == 0) { if (self._writeQueue.length == 0) {
buffer = self._allocateSendBuffer(); buffer = self._allocateSendBuffer();
} else { } else {
buffer = self._sendQueueLast(); buffer = self.__writeQueueLast();
if (buffer.used == buffer.length) { if (buffer.used == buffer.length) {
buffer = self._allocateSendBuffer(); buffer = self._allocateSendBuffer();
} }
@ -230,7 +232,7 @@ Socket.prototype._sendString = function (data, encoding) {
charsWritten = buffer.utf8Write(data, charsWritten = buffer.utf8Write(data,
buffer.used, buffer.used,
buffer.length - buffer.used); buffer.length - buffer.used);
bytesWritten = process.Buffer.utf8ByteLength(data.slice(0, charsWritten)); bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten));
} else { } else {
// ascii // ascii
buffer.isFd = false; buffer.isFd = false;
@ -242,9 +244,9 @@ Socket.prototype._sendString = function (data, encoding) {
buffer.used += bytesWritten; buffer.used += bytesWritten;
if (buffer.isFd) { if (buffer.isFd) {
self.sendMessageQueueSize += 1; self._writeMessageQueueSize += 1;
} else { } else {
self.sendQueueSize += bytesWritten; self._writeQueueSize += bytesWritten;
} }
debug('charsWritten ' + charsWritten); debug('charsWritten ' + charsWritten);
@ -252,40 +254,45 @@ Socket.prototype._sendString = function (data, encoding) {
// If we didn't finish, then recurse with the rest of the string. // If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) { if (charsWritten < data.length) {
debug('recursive send'); debug('recursive write');
self._sendString(data.slice(charsWritten), encoding); self._writeString(data.slice(charsWritten), encoding);
} }
}; };
Socket.prototype._sendQueueLast = function () { Socket.prototype.__writeQueueLast = function () {
return this.sendQueue.length > 0 ? this.sendQueue[this.sendQueue.length-1] return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
: null; : null;
}; };
Socket.prototype.send = function () {
throw new Error('send renamed to write');
};
// Returns true if all the data was flushed to socket. Returns false if // Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will // something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket. // signal when it has been finally flushed to socket.
Socket.prototype.send = function (data, encoding) { Socket.prototype.write = function (data, encoding) {
var self = this; var self = this;
if (!self.writable) throw new Error('Socket is not writable'); if (!self.writable) throw new Error('Socket is not writable');
if (self._sendQueueLast() == END_OF_FILE) { if (self.__writeQueueLast() == END_OF_FILE) {
throw new Error('socket.close() called already; cannot write.'); throw new Error('socket.close() called already; cannot write.');
} }
if (typeof(data) == 'string') { if (typeof(data) == 'string') {
self._sendString(data, encoding); self._writeString(data, encoding);
} else { } else {
// data is a process.Buffer // data is a Buffer
// walk through the sendQueue, find the first empty buffer // walk through the _writeQueue, find the first empty buffer
//var inserted = false; //var inserted = false;
data.sent = 0; data.sent = 0;
data.used = data.length; data.used = data.length;
self.sendQueue.push(data); self._writeQueue.push(data);
self.sendQueueSize += data.used; self._writeQueueSize += data.used;
} }
return this.flush(); return this.flush();
}; };
@ -296,7 +303,7 @@ Socket.prototype.sendFD = function(socketToPass) {
if (!self.writable) throw new Error('Socket is not writable'); if (!self.writable) throw new Error('Socket is not writable');
if (self._sendQueueLast() == END_OF_FILE) { if (self.__writeQueueLast() == END_OF_FILE) {
throw new Error('socket.close() called already; cannot write.'); throw new Error('socket.close() called already; cannot write.');
} }
@ -308,7 +315,7 @@ Socket.prototype.sendFD = function(socketToPass) {
throw new Error('Provided arg is not a socket'); throw new Error('Provided arg is not a socket');
} }
return self.send(socketToPass.fd.toString(), "fd"); return self.write(socketToPass.fd.toString(), "fd");
}; };
@ -318,10 +325,10 @@ Socket.prototype.flush = function () {
var self = this; var self = this;
var bytesWritten; var bytesWritten;
while (self.sendQueue.length) { while (self._writeQueue.length) {
if (!self.writable) throw new Error('Socket is not writable'); if (!self.writable) throw new Error('Socket is not writable');
var b = self.sendQueue[0]; var b = self._writeQueue[0];
if (b == END_OF_FILE) { if (b == END_OF_FILE) {
self._shutdown(); self._shutdown();
@ -330,7 +337,7 @@ Socket.prototype.flush = function () {
if (b.sent == b.used) { if (b.sent == b.used) {
// shift! // shift!
self.sendQueue.shift(); self._writeQueue.shift();
buffers.free(b); buffers.free(b);
continue; continue;
} }
@ -340,7 +347,7 @@ Socket.prototype.flush = function () {
try { try {
if (b.isFd) { if (b.isFd) {
fdToSend = parseInt(b.asciiSlice(b.sent, b.used - b.sent)); fdToSend = parseInt(b.asciiSlice(b.sent, b.used - b.sent));
bytesWritten = sendFD(self.fd, fdToSend); bytesWritten = writeFD(self.fd, fdToSend);
} else { } else {
bytesWritten = write(self.fd, bytesWritten = write(self.fd,
b, b,
@ -355,16 +362,16 @@ Socket.prototype.flush = function () {
if (bytesWritten === null) { if (bytesWritten === null) {
// could not flush everything // could not flush everything
self._writeWatcher.start(); self._writeWatcher.start();
assert(self.sendQueueSize > 0); assert(self._writeQueueSize > 0);
return false; return false;
} }
if (b.isFd) { if (b.isFd) {
b.sent = b.used; b.sent = b.used;
self.sendMessageQueueSize -= 1; self._writeMessageQueueSize -= 1;
debug('sent fd: ' + fdToSend); debug('sent fd: ' + fdToSend);
} else { } else {
b.sent += bytesWritten; b.sent += bytesWritten;
self.sendQueueSize -= bytesWritten; self._writeQueueSize -= bytesWritten;
debug('bytes sent: ' + b.sent); debug('bytes sent: ' + b.sent);
} }
} }
@ -446,9 +453,9 @@ Socket.prototype.forceClose = function (exception) {
// recvBuffer is shared between sockets, so don't need to free it here. // recvBuffer is shared between sockets, so don't need to free it here.
var b; var b;
while (this.sendQueue.length) { while (this._writeQueue.length) {
b = this.sendQueue.shift(); b = this._writeQueue.shift();
if (b instanceof process.Buffer) buffers.free(b); if (b instanceof Buffer) buffers.free(b);
} }
if (this._writeWatcher) { if (this._writeWatcher) {
@ -489,8 +496,8 @@ Socket.prototype._shutdown = function () {
Socket.prototype.close = function () { Socket.prototype.close = function () {
if (this.writable) { if (this.writable) {
if (this._sendQueueLast() != END_OF_FILE) { if (this.__writeQueueLast() != END_OF_FILE) {
this.sendQueue.push(END_OF_FILE); this._writeQueue.push(END_OF_FILE);
this.flush(); this.flush();
} }
} }

2
test/simple/test-net-fd-passing.js

@ -29,7 +29,7 @@ function fdPassingTest(path, port) {
var client = net.createConnection(port); var client = net.createConnection(port);
client.addListener("connect", function() { client.addListener("connect", function() {
client.send(message); client.write(message);
}); });
client.addListener("data", function(data) { client.addListener("data", function(data) {

8
test/simple/test-net-pingpong.js

@ -25,7 +25,7 @@ function pingPongTest (port, host) {
assert.equal(true, socket.readable); assert.equal(true, socket.readable);
assert.equal(true, count <= N); assert.equal(true, count <= N);
if (/PING/.exec(data)) { if (/PING/.exec(data)) {
socket.send("PONG"); socket.write("PONG");
} }
}); });
@ -50,7 +50,7 @@ function pingPongTest (port, host) {
client.addListener("connect", function () { client.addListener("connect", function () {
assert.equal(true, client.readable); assert.equal(true, client.readable);
assert.equal(true, client.writable); assert.equal(true, client.writable);
client.send("PING"); client.write("PING");
}); });
client.addListener("data", function (data) { client.addListener("data", function (data) {
@ -69,10 +69,10 @@ function pingPongTest (port, host) {
} }
if (count < N) { if (count < N) {
client.send("PING"); client.write("PING");
} else { } else {
sent_final_ping = true; sent_final_ping = true;
client.send("PING"); client.write("PING");
client.close(); client.close();
} }
}); });

Loading…
Cancel
Save