|
@ -501,6 +501,83 @@ Stream.prototype.write = function (data, encoding) { |
|
|
|
|
|
|
|
|
if (!self.writable) throw new Error('Stream is not writable'); |
|
|
if (!self.writable) throw new Error('Stream is not writable'); |
|
|
|
|
|
|
|
|
|
|
|
if (self._writeQueue && self._writeQueue.length) { |
|
|
|
|
|
return self._writeQueued(data, encoding); // slow
|
|
|
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
// The most common case. There is no write queue. Just push the data
|
|
|
|
|
|
// directly to the socket.
|
|
|
|
|
|
|
|
|
|
|
|
var bytesWritten; |
|
|
|
|
|
var buffer = data, off = 0, len = data.length; |
|
|
|
|
|
|
|
|
|
|
|
if (typeof data == 'string') { |
|
|
|
|
|
encoding = (encoding || 'utf8').toLowerCase(); |
|
|
|
|
|
var bytes = encoding == 'utf8' ? Buffer.utf8ByteLength(data) : data.length; |
|
|
|
|
|
|
|
|
|
|
|
//debug('write string :' + JSON.stringify(data));
|
|
|
|
|
|
|
|
|
|
|
|
if (!recvBuffer) allocRecvBuffer(); |
|
|
|
|
|
|
|
|
|
|
|
if (recvBuffer.length - recvBuffer.used < bytes) { |
|
|
|
|
|
// not enough room - go to slow case
|
|
|
|
|
|
return self._writeQueued(data, encoding); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var charsWritten; |
|
|
|
|
|
if (encoding == 'utf8') { |
|
|
|
|
|
recvBuffer.utf8Write(data, recvBuffer.used); |
|
|
|
|
|
} else { |
|
|
|
|
|
// ascii
|
|
|
|
|
|
recvBuffer.asciiWrite(data, recvBuffer.used); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
buffer = recvBuffer; |
|
|
|
|
|
off = recvBuffer.used; |
|
|
|
|
|
len = bytes; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//debug('write [fd, off, len] =' + JSON.stringify([self.fd, off, len]));
|
|
|
|
|
|
|
|
|
|
|
|
// Send the buffer.
|
|
|
|
|
|
try { |
|
|
|
|
|
bytesWritten = write(self.fd, buffer, off, len); |
|
|
|
|
|
} catch (e) { |
|
|
|
|
|
self.forceClose(e); |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//debug('wrote ' + bytesWritten);
|
|
|
|
|
|
|
|
|
|
|
|
// Note: if using the recvBuffer - we don't need to increase
|
|
|
|
|
|
// recvBuffer.used because it was all sent. Just reuse that space.
|
|
|
|
|
|
|
|
|
|
|
|
if (bytesWritten == len) return true; |
|
|
|
|
|
|
|
|
|
|
|
//debug('write incomplete ' + bytesWritten + ' < ' + len);
|
|
|
|
|
|
|
|
|
|
|
|
if (buffer == data) { |
|
|
|
|
|
data.sent = bytesWritten || 0; |
|
|
|
|
|
data.used = data.length; |
|
|
|
|
|
} else { |
|
|
|
|
|
// string
|
|
|
|
|
|
recvBuffer.used += bytesWritten; |
|
|
|
|
|
data = recvBuffer.slice(off+bytesWritten, off+len+bytesWritten); |
|
|
|
|
|
data.sent = 0; |
|
|
|
|
|
data.used = data.length; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//if (!self._writeQueue) initWriteStream(self);
|
|
|
|
|
|
self._writeQueue.push(data); |
|
|
|
|
|
self._writeQueueSize += data.used; |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Stream.prototype._writeQueued = function (data, encoding) { |
|
|
|
|
|
//debug('_writeQueued');
|
|
|
|
|
|
var self = this; |
|
|
|
|
|
|
|
|
if (self.__writeQueueLast() == END_OF_FILE) { |
|
|
if (self.__writeQueueLast() == END_OF_FILE) { |
|
|
throw new Error('socket.close() called already; cannot write.'); |
|
|
throw new Error('socket.close() called already; cannot write.'); |
|
|
} |
|
|
} |
|
@ -597,7 +674,7 @@ Stream.prototype.flush = function () { |
|
|
} else { |
|
|
} else { |
|
|
b.sent += bytesWritten; |
|
|
b.sent += bytesWritten; |
|
|
self._writeQueueSize -= bytesWritten; |
|
|
self._writeQueueSize -= bytesWritten; |
|
|
debug('bytes sent: ' + b.sent); |
|
|
//debug('bytes sent: ' + b.sent);
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
if (self._writeWatcher) self._writeWatcher.stop(); |
|
|
if (self._writeWatcher) self._writeWatcher.stop(); |
|
@ -654,7 +731,7 @@ Stream.prototype.connect = function () { |
|
|
|
|
|
|
|
|
if (port >= 0) { |
|
|
if (port >= 0) { |
|
|
self.fd = socket('tcp'); |
|
|
self.fd = socket('tcp'); |
|
|
debug('new fd = ' + self.fd); |
|
|
//debug('new fd = ' + self.fd);
|
|
|
self.type = 'tcp'; |
|
|
self.type = 'tcp'; |
|
|
// TODO dns resolution on arguments[1]
|
|
|
// TODO dns resolution on arguments[1]
|
|
|
var port = arguments[0]; |
|
|
var port = arguments[0]; |
|
@ -726,7 +803,7 @@ Stream.prototype.forceClose = function (exception) { |
|
|
// FIXME Bug when this.fd == 0
|
|
|
// FIXME Bug when this.fd == 0
|
|
|
if (this.fd) { |
|
|
if (this.fd) { |
|
|
close(this.fd); |
|
|
close(this.fd); |
|
|
debug('close ' + this.fd); |
|
|
//debug('close ' + this.fd);
|
|
|
this.fd = null; |
|
|
this.fd = null; |
|
|
process.nextTick(function () { |
|
|
process.nextTick(function () { |
|
|
if (exception) self.emit('error', exception); |
|
|
if (exception) self.emit('error', exception); |
|
|