|
|
@ -243,10 +243,10 @@ var buffers = new FreeList("buffer", 100, function (l) { |
|
|
|
|
|
|
|
|
|
|
|
// Allocated on demand.
|
|
|
|
var recvBuffer = null; |
|
|
|
function allocRecvBuffer () { |
|
|
|
recvBuffer = new Buffer(40*1024); |
|
|
|
recvBuffer.used = 0; |
|
|
|
var pool = null; |
|
|
|
function allocNewPool () { |
|
|
|
pool = new Buffer(40*1024); |
|
|
|
pool.used = 0; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -267,27 +267,27 @@ function _doFlush () { |
|
|
|
function initStream (self) { |
|
|
|
self._readWatcher = ioWatchers.alloc(); |
|
|
|
self._readWatcher.callback = function () { |
|
|
|
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
|
|
|
// most of the recvBuffer, allocate a new one.
|
|
|
|
if (recvBuffer) { |
|
|
|
if (recvBuffer.length - recvBuffer.used < 128) { |
|
|
|
// discard the old recvBuffer. Can't add to the free list because
|
|
|
|
// If this is the first recv (pool doesn't exist) or we've used up
|
|
|
|
// most of the pool, allocate a new one.
|
|
|
|
if (pool) { |
|
|
|
if (pool.length - pool.used < 128) { |
|
|
|
// discard the old pool. Can't add to the free list because
|
|
|
|
// users might have refernces to slices on it.
|
|
|
|
recvBuffer = null; |
|
|
|
allocRecvBuffer(); |
|
|
|
pool = null; |
|
|
|
allocNewPool(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
allocRecvBuffer(); |
|
|
|
allocNewPool(); |
|
|
|
} |
|
|
|
|
|
|
|
//debug('recvBuffer.used ' + recvBuffer.used);
|
|
|
|
//debug('pool.used ' + pool.used);
|
|
|
|
var bytesRead; |
|
|
|
|
|
|
|
try { |
|
|
|
bytesRead = read(self.fd, |
|
|
|
recvBuffer, |
|
|
|
recvBuffer.used, |
|
|
|
recvBuffer.length - recvBuffer.used); |
|
|
|
pool, |
|
|
|
pool.used, |
|
|
|
pool.length - pool.used); |
|
|
|
} catch (e) { |
|
|
|
self.forceClose(e); |
|
|
|
return; |
|
|
@ -307,30 +307,30 @@ function initStream (self) { |
|
|
|
|
|
|
|
timeout.active(self); |
|
|
|
|
|
|
|
var start = recvBuffer.used; |
|
|
|
var end = recvBuffer.used + bytesRead; |
|
|
|
var start = pool.used; |
|
|
|
var end = pool.used + bytesRead; |
|
|
|
|
|
|
|
if (!self._encoding) { |
|
|
|
if (self._events && self._events['data']) { |
|
|
|
// emit a slice
|
|
|
|
self.emit('data', recvBuffer.slice(start, end)); |
|
|
|
self.emit('data', pool.slice(start, end)); |
|
|
|
} |
|
|
|
|
|
|
|
// Optimization: emit the original buffer with end points
|
|
|
|
if (self.ondata) self.ondata(recvBuffer, start, end); |
|
|
|
if (self.ondata) self.ondata(pool, start, end); |
|
|
|
} else { |
|
|
|
// TODO remove me - we should only output Buffer
|
|
|
|
|
|
|
|
var string; |
|
|
|
switch (self._encoding) { |
|
|
|
case 'utf8': |
|
|
|
string = recvBuffer.utf8Slice(start, end); |
|
|
|
string = pool.utf8Slice(start, end); |
|
|
|
break; |
|
|
|
case 'ascii': |
|
|
|
string = recvBuffer.asciiSlice(start, end); |
|
|
|
string = pool.asciiSlice(start, end); |
|
|
|
break; |
|
|
|
case 'binary': |
|
|
|
string = recvBuffer.binarySlice(start, end); |
|
|
|
string = pool.binarySlice(start, end); |
|
|
|
break; |
|
|
|
default: |
|
|
|
throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer'); |
|
|
@ -339,7 +339,7 @@ function initStream (self) { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
recvBuffer.used += bytesRead; |
|
|
|
pool.used += bytesRead; |
|
|
|
} |
|
|
|
}; |
|
|
|
self.readable = false; |
|
|
@ -501,26 +501,26 @@ Stream.prototype.write = function (data, encoding) { |
|
|
|
|
|
|
|
//debug('write string :' + JSON.stringify(data));
|
|
|
|
|
|
|
|
if (!recvBuffer) allocRecvBuffer(); |
|
|
|
if (!pool) allocNewPool(); |
|
|
|
|
|
|
|
if (recvBuffer.length - recvBuffer.used < bytes) { |
|
|
|
if (pool.length - pool.used < bytes) { |
|
|
|
// not enough room - go to slow case
|
|
|
|
return self._queueWrite(data, encoding); |
|
|
|
} |
|
|
|
|
|
|
|
var charsWritten; |
|
|
|
if (encoding == 'utf8') { |
|
|
|
recvBuffer.utf8Write(data, recvBuffer.used); |
|
|
|
pool.utf8Write(data, pool.used); |
|
|
|
} else if (encoding == 'ascii') { |
|
|
|
// ascii
|
|
|
|
recvBuffer.asciiWrite(data, recvBuffer.used); |
|
|
|
pool.asciiWrite(data, pool.used); |
|
|
|
} else { |
|
|
|
// binary
|
|
|
|
recvBuffer.binaryWrite(data, recvBuffer.used); |
|
|
|
pool.binaryWrite(data, pool.used); |
|
|
|
} |
|
|
|
|
|
|
|
buffer = recvBuffer; |
|
|
|
off = recvBuffer.used; |
|
|
|
buffer = pool; |
|
|
|
off = pool.used; |
|
|
|
len = bytes; |
|
|
|
} |
|
|
|
|
|
|
@ -536,8 +536,8 @@ Stream.prototype.write = function (data, encoding) { |
|
|
|
|
|
|
|
//debug('wrote ' + bytesWritten);
|
|
|
|
|
|
|
|
// Note: if using the recvBuffer - we don't need to increase
|
|
|
|
// recvBuffer.used because it was all sent. Just reuse that space.
|
|
|
|
// Note: if using the pool - we don't need to increase
|
|
|
|
// pool.used because it was all sent. Just reuse that space.
|
|
|
|
|
|
|
|
if (bytesWritten == len) return true; |
|
|
|
|
|
|
@ -548,8 +548,8 @@ Stream.prototype.write = function (data, encoding) { |
|
|
|
data.used = data.length; |
|
|
|
} else { |
|
|
|
// string
|
|
|
|
recvBuffer.used += bytesWritten; |
|
|
|
data = recvBuffer.slice(off+bytesWritten, off+len+bytesWritten); |
|
|
|
pool.used += bytesWritten; |
|
|
|
data = pool.slice(off+bytesWritten, off+len+bytesWritten); |
|
|
|
data.sent = 0; |
|
|
|
data.used = data.length; |
|
|
|
} |
|
|
@ -733,7 +733,7 @@ Stream.prototype.resume = function () { |
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype.forceClose = function (exception) { |
|
|
|
// recvBuffer is shared between sockets, so don't need to free it here.
|
|
|
|
// pool is shared between sockets, so don't need to free it here.
|
|
|
|
var self = this; |
|
|
|
|
|
|
|
var b; |
|
|
|