Browse Source

Better flushing

Lost Utf8 support. Waiting for
Ryan Dahl 15 years ago
  1. 182
  2. 4


@ -398,73 +398,6 @@ Object.defineProperty(Stream.prototype, 'readyState', {
// Here's the deal. Character encodings are hard. We need to take javascript
// strings and turn them into raw binary to send them to socket. Javascript
// strings are pure unicode (I think V8 uses 16-bit arrays to hold them).
// So an encoding needs to be given to write it out to socket - this is
// usually 'utf8'.
// This function, encodeString, takes a buffer and writes the string to it
// starting at buffer.used. If it could fit the entire string into the
// buffer then it increases the buffer's .used member and returns buffer.
// Otherwise it creates a new buffer large enough to fit the entire string,
// writes that string into the new buffer, and then returns it.
function encodeString (buffer, string, encoding) {
encoding = (encoding || 'utf8').toLowerCase();
var bytesWritten;
if (string.length < buffer.length - buffer.used) {
// Try to write
if (encoding == 'utf8' || encoding == 'utf-8') {
bytesWritten = buffer.utf8Write(string, buffer.used);
debug('wrote ' + bytesWritten + ' utf8 bytes to buffer');
if (buffer[bytesWritten-1] == 0) {
// wrote the whole string.
buffer.used += bytesWritten-1;
return buffer;
} else {
if (encoding == 'ascii') {
bytesWritten = buffer.asciiWrite(string, buffer.used);
buffer.used += bytesWritten; // bytesWritten-1 ?
} else {
bytesWritten = buffer.binaryWrite(string, buffer.used);
buffer.used += bytesWritten;
return buffer;
// Couldn't fit the string in the given buffer. Instead of partially
// writing it and then slicing the string, we'll do something stupid and
// just create a new temporary buffer just for that string.
// (The reasoning is: slicing is expensive.)
var byteLength = Buffer.byteLength(string, encoding);
var newBuffer = new Buffer(byteLength);
debug('alloced new buffer for string : ' + newBuffer.length);
if (encoding == 'utf8' || encoding == 'utf-8') {
bytesWritten = newBuffer.utf8Write(string, 0);
} else if (encoding == 'ascii') {
bytesWritten = newBuffer.asciiWrite(string, 0);
} else {
bytesWritten = newBuffer.binaryWrite(string, 0);
debug('filled up new buffer');
assert(bytesWritten == byteLength);
newBuffer.used = byteLength;
newBuffer.sent = 0;
return newBuffer;
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
@ -479,39 +412,80 @@ Stream.prototype.write = function (data, encoding) {
return false;
} else {
// Fast.
// The most common case. There is no write queue. Just push the data
// directly to the socket.
return this._writeOut(data, encoding);
// Directly writes the data to socket.
// unshifts remainder onto _writeQueue.
// Steps:
// 1. If it's a string, write it to the `pool`. (If not space remains
// on the pool make a new one.)
// 2. Write data to socket. Return true if flushed.
// 3. Slice out remaining
// 4. Unshift remaining onto _writeQueue. Return false.
Stream.prototype._writeOut = function (data, encoding) {
if (!this.writable) throw new Error('Stream is not writable');
// The most common case. There is no write queue. Just push the data
// directly to the socket.
var buffer, off, len;
var bytesWritten, charsWritten;
if (typeof data == 'string') {
if (!pool) allocNewPool();
var startOffset = pool.used;
buffer = encodeString(pool, data, encoding);
off = (buffer == pool ? startOffset : 0);
len = buffer.used - off;
} else {
var queuedData = false;
if (typeof data != 'string') {
// 'data' is a buffer, ignore 'encoding'
buffer = data;
off = data.sent || 0;
off = 0;
len = data.length;
} else {
assert(typeof data == 'string')
if (!pool || pool.length - pool.used < 128) {
pool = null;
if (encoding == 'utf8' || encoding == 'utf-8') {
bytesWritten = pool.utf8Write(data, pool.used);
charsWritten = bytesWritten; // XXX FIXME
} else if (encoding == 'ascii') {
bytesWritten = pool.asciiWrite(data, pool.used);
charsWritten = bytesWritten;
assert(charsWritten <= data.length);
} else {
bytesWritten = pool.binaryWrite(data, pool.used);
charsWritten = bytesWritten;
assert(charsWritten <= data.length);
assert(bytesWritten > 0);
buffer = pool;
len = bytesWritten;
off = pool.used;
pool.used += bytesWritten;
debug('wrote ' + bytesWritten + ' bytes to pool');
if (charsWritten != data.length) {
//debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n");
// Unshift whatever didn't fit onto the buffer
queuedData = true;
debug('write [fd, off, len] =' + JSON.stringify([this.fd, off, len]));
// Send the buffer.
var bytesWritten;
try {
bytesWritten = write(this.fd, buffer, off, len);
} catch (e) {
@ -519,49 +493,33 @@ Stream.prototype._writeOut = function (data, encoding) {
return false;
debug('wrote ' + bytesWritten);
debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");;
if (bytesWritten == len) {
// awesome. sent to buffer - save that space
buffer.used -= len;
return true;
// awesome. sent to buffer.
buffer.used -= len; // Optimization - save the space
if (queuedData) {
return false;
} else {
return true;
//sys.error('write incomplete ' + bytesWritten + ' < ' + len);
// Didn't write the entire thing to buffer.
// Need to wait for the socket to become available before trying again.
// Slice out the data left.
var leftOver = data.slice(off + bytesWritten, off + len);
leftOver.used = leftOver.length; // used the whole thing...
if (buffer == data) {
bytesWritten = bytesWritten || 0;
data = buffer.slice(bytesWritten, len);
data.sent = 0;
data.used = data.length;
} else if (buffer == pool) {
data = pool.slice(off + bytesWritten, off + len);
data.sent = 0;
data.used = data.length;
} else {
data = buffer;
data.sent = bytesWritten;
assert(typeof data.used == 'number');
assert(typeof data.sent == 'number');
// sys.error('data.used = ' + data.used);
// sys.error('data.sent = ' + data.sent);
// sys.error('bytes left, data.used - data.send = ' + (data.used - data.sent));
// sys.error('data.used = ' + data.used);
//if (!this._writeQueue) initWriteStream(this);
// data should be the next thing to write.
return false;


@ -969,7 +969,9 @@ static Handle<Value> Write(const Arguments& args) {
ssize_t written = write(fd, (char*)buffer->data() + off, len);
if (written < 0) {
if (errno == EAGAIN || errno == EINTR) return Null();
if (errno == EAGAIN || errno == EINTR) {
return scope.Close(Integer::New(0));
return ThrowException(ErrnoException(errno, "write"));
