Browse Source

stream: switch _writableState.buffer to queue

In cases where many small writes are made to a stream
lacking _writev, the array data structure backing the
WriteReq buffer would greatly increase GC pressure.

Specifically, in the fs.WriteStream case, the
clearBuffer routine would only clear a single WriteReq
from the buffer before exiting, but would cause the
entire backing array to be GC'd. Switching to [].shift
lessened pressure, but still the bulk of the time was
spent in memcpy.

This replaces that structure with a linked list-backed
queue so that adding and removing from the queue is O(1).
In the _writev case, collecting the buffer requires an
O(N) loop over the buffer, but that was already being
performed to collect callbacks, so slowdown should be
neglible.

PR-URL: https://github.com/joyent/node/pull/8826
Reviewed-by: Timothy J Fontaine <tjfontaine@gmail.com>
Reviewed-by: Trevor Norris <trev.norris@gmail.com>
v0.11.15-release
Chris Dickinson 10 years ago
parent
commit
91586661c9
  1. 68
      lib/_stream_writable.js
  2. 2
      lib/net.js
  3. 2
      test/simple/test-stream2-transform.js

68
lib/_stream_writable.js

@ -28,6 +28,7 @@ Writable.WritableState = WritableState;
var util = require('util');
var Stream = require('stream');
var debug = util.debuglog('stream');
util.inherits(Writable, Stream);
@ -35,6 +36,7 @@ function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;
this.next = null;
}
function WritableState(options, stream) {
@ -109,7 +111,8 @@ function WritableState(options, stream) {
// the amount that is being written when _write is called.
this.writelen = 0;
this.buffer = [];
this.bufferedRequest = null;
this.lastBufferedRequest = null;
// number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
@ -123,6 +126,23 @@ function WritableState(options, stream) {
this.errorEmitted = false;
}
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
var current = this.bufferedRequest;
var out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
};
Object.defineProperty(WritableState.prototype, 'buffer', {
get: util.deprecate(function() {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use ' +
'_writableState.getBuffer() instead.')
});
function Writable(options) {
// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
@ -216,7 +236,7 @@ Writable.prototype.uncork = function() {
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
state.buffer.length)
state.bufferedRequest)
clearBuffer(this, state);
}
};
@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;
if (state.writing || state.corked)
state.buffer.push(new WriteReq(chunk, encoding, cb));
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
}
else
doWrite(stream, state, false, len, chunk, encoding, cb);
@ -313,7 +340,7 @@ function onwrite(stream, er) {
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.buffer.length) {
state.bufferedRequest) {
clearBuffer(stream, state);
}
@ -349,17 +376,23 @@ function onwriteDrain(stream, state) {
// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;
if (stream._writev && state.buffer.length > 1) {
if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var buffer = [];
var cbs = [];
for (var c = 0; c < state.buffer.length; c++)
cbs.push(state.buffer[c].callback);
while (entry) {
cbs.push(entry.callback);
buffer.push(entry);
entry = entry.next;
}
// count the one we are adding, as well.
// TODO(isaacs) clean this up
state.pendingcb++;
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
state.lastBufferedRequest = null;
doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
@ -367,34 +400,29 @@ function clearBuffer(stream, state) {
});
// Clear buffer
state.buffer = [];
} else {
// Slow case, write chunks one-by-one
for (var c = 0; c < state.buffer.length; c++) {
var entry = state.buffer[c];
while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, cb);
entry = entry.next;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
c++;
break;
}
}
if (c < state.buffer.length)
state.buffer = state.buffer.slice(c);
else
state.buffer.length = 0;
if (entry === null)
state.lastBufferedRequest = null;
}
state.bufferedRequest = entry;
state.bufferProcessing = false;
}
@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
function needFinish(stream, state) {
return (state.ending &&
state.length === 0 &&
state.buffer.length === 0 &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
}

2
lib/net.js

@ -732,7 +732,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
data = this._pendingData,
encoding = this._pendingEncoding;
state.buffer.forEach(function(el) {
state.getBuffer().forEach(function(el) {
if (util.isBuffer(el.chunk))
bytes += el.chunk.length;
else

2
test/simple/test-stream2-transform.js

@ -81,7 +81,7 @@ test('writable side consumption', function(t) {
t.equal(tx._readableState.length, 10);
t.equal(transformed, 10);
t.equal(tx._transformState.writechunk.length, 5);
t.same(tx._writableState.buffer.map(function(c) {
t.same(tx._writableState.getBuffer().map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);

Loading…
Cancel
Save