@ -25,6 +25,7 @@ var timers = require('timers');
var util = require ( 'util' ) ;
var assert = require ( 'assert' ) ;
var cares = process . binding ( 'cares_wrap' ) ;
var uv = process . binding ( 'uv' ) ;
var cluster ;
var errnoException = util . _ errnoException ;
@ -206,12 +207,11 @@ function onSocketFinish() {
if ( ! this . _ handle || ! this . _ handle . shutdown )
return this . destroy ( ) ;
var shutdownReq = this . _ handle . shutdown ( ) ;
var req = { oncomplete : afterShutdown } ;
var err = this . _ handle . shutdown ( req ) ;
if ( ! shutdownReq )
return this . _ destroy ( errnoException ( process . _ errno , 'shutdown' ) ) ;
shutdownReq . oncomplete = afterShutdown ;
if ( err )
return this . _ destroy ( errnoException ( err , 'shutdown' ) ) ;
}
@ -338,7 +338,10 @@ Socket.prototype.setKeepAlive = function(setting, msecs) {
Socket . prototype . address = function ( ) {
if ( this . _ handle && this . _ handle . getsockname ) {
return this . _ handle . getsockname ( ) ;
var out = { } ;
var err = this . _ handle . getsockname ( out ) ;
// TODO(bnoordhuis) Check err and throw?
return out ;
}
return null ;
} ;
@ -381,9 +384,9 @@ Socket.prototype._read = function(n) {
// not already reading, start the flow
debug ( 'Socket._read readStart' ) ;
this . _ handle . reading = true ;
var r = this . _ handle . readStart ( ) ;
if ( r )
this . _ destroy ( errnoException ( process . _ errno , 'read' ) ) ;
var er r = this . _ handle . readStart ( ) ;
if ( er r)
this . _ destroy ( errnoException ( err , 'read' ) ) ;
}
} ;
@ -486,17 +489,16 @@ Socket.prototype.destroy = function(exception) {
// This function is called whenever the handle gets a
// buffer, or when there's an error reading.
function onread ( buffer ) {
function onread ( nread , buffer ) {
var handle = this ;
var self = handle . owner ;
var length = ! ! buffer ? buffer . length : 0 ;
assert ( handle === self . _ handle , 'handle != self._handle' ) ;
timers . _ unrefActive ( self ) ;
debug ( 'onread' , process . _ errno , length ) ;
debug ( 'onread' , nread ) ;
if ( buffer ) {
if ( nread > 0 ) {
debug ( 'got data' ) ;
// read success.
@ -504,16 +506,9 @@ function onread(buffer) {
// will prevent this from being called again until _read() gets
// called again.
// if we didn't get any bytes, that doesn't necessarily mean EOF.
// wait for the next one.
if ( length === 0 ) {
debug ( 'not any data, keep waiting' ) ;
return ;
}
// if it's not enough data, we'll just call handle.readStart()
// again right away.
self . bytesRead += length ;
self . bytesRead += nread ;
// Optimization: emit the original buffer with end points
var ret = true ;
@ -523,33 +518,41 @@ function onread(buffer) {
if ( handle . reading && ! ret ) {
handle . reading = false ;
debug ( 'readStop' ) ;
var r = handle . readStop ( ) ;
if ( r )
self . _ destroy ( errnoException ( process . _ errno , 'read' ) ) ;
var er r = handle . readStop ( ) ;
if ( er r)
self . _ destroy ( errnoException ( err , 'read' ) ) ;
}
return ;
}
} else if ( process . _ errno == 'EOF' ) {
debug ( 'EOF' ) ;
if ( self . _ readableState . length === 0 ) {
self . readable = false ;
maybeDestroy ( self ) ;
}
// if we didn't get any bytes, that doesn't necessarily mean EOF.
// wait for the next one.
if ( nread === 0 ) {
debug ( 'not any data, keep waiting' ) ;
return ;
}
if ( self . onend ) self . once ( 'end' , self . onend ) ;
// Error, possibly EOF.
if ( nread !== uv . UV_EOF ) {
return self . _ destroy ( errnoException ( nread , 'read' ) ) ;
}
// push a null to signal the end of data.
self . push ( null ) ;
debug ( 'EOF' ) ;
// internal end event so that we know that the actual socket
// is no longer readable, and we can start the shutdown
// procedure. No need to wait for all the data to be consumed.
self . emit ( '_socketEnd' ) ;
} else {
debug ( 'error' , process . _ errno ) ;
// Error
self . _ destroy ( errnoException ( process . _ errno , 'read' ) ) ;
if ( self . _ readableState . length === 0 ) {
self . readable = false ;
maybeDestroy ( self ) ;
}
if ( self . onend ) self . once ( 'end' , self . onend ) ;
// push a null to signal the end of data.
self . push ( null ) ;
// internal end event so that we know that the actual socket
// is no longer readable, and we can start the shutdown
// procedure. No need to wait for all the data to be consumed.
self . emit ( '_socketEnd' ) ;
}
@ -558,10 +561,10 @@ Socket.prototype._getpeername = function() {
return { } ;
}
if ( ! this . _ peername ) {
this . _ peername = this . _ handle . getpeername ( ) ;
if ( ! this . _ peername ) {
return { } ;
}
var out = { } ;
var err = this . _ handle . getpeername ( out ) ;
if ( err ) return { } ; // FIXME(bnoordhuis) Throw?
this . _ peername = out ;
}
return this . _ peername ;
} ;
@ -582,10 +585,10 @@ Socket.prototype._getsockname = function() {
return { } ;
}
if ( ! this . _ sockname ) {
this . _ sockname = this . _ handle . getsockname ( ) ;
if ( ! this . _ sockname ) {
return { } ;
}
var out = { } ;
var err = this . _ handle . getsockname ( out ) ;
if ( err ) return { } ; // FIXME(bnoordhuis) Throw?
this . _ sockname = out ;
}
return this . _ sockname ;
} ;
@ -630,7 +633,9 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
return false ;
}
var writeReq ;
var req = { oncomplete : afterWrite } ;
var err ;
if ( writev ) {
var chunks = new Array ( data . length << 1 ) ;
for ( var i = 0 ; i < data . length ; i ++ ) {
@ -640,28 +645,26 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
chunks [ i * 2 ] = chunk ;
chunks [ i * 2 + 1 ] = enc ;
}
var writeReq = this . _ handle . writev ( chunks ) ;
err = this . _ handle . writev ( req , chunks ) ;
// Retain chunks
if ( writeReq )
writeReq . _ chunks = chunks ;
if ( err === 0 ) req . _ chunks = chunks ;
} else {
var enc = Buffer . isBuffer ( data ) ? 'buffer' : encoding ;
var writeReq = createWriteReq ( this . _ handle , data , enc ) ;
err = createWriteReq ( req , this . _ handle , data , enc ) ;
}
if ( ! writeReq )
return this . _ destroy ( errnoException ( process . _ errno , 'write' ) , cb ) ;
if ( err )
return this . _ destroy ( errnoException ( err , 'write' ) , cb ) ;
writeReq . oncomplete = afterWrite ;
this . _ bytesDispatched += writeReq . bytes ;
this . _ bytesDispatched += req . bytes ;
// If it was entirely flushed, we can write some more right now.
// However, if more is left in the queue, then wait until that clears.
if ( this . _ handle . writeQueueSize === 0 )
cb ( ) ;
else
w riteR eq. cb = cb ;
req . cb = cb ;
} ;
@ -698,26 +701,26 @@ function getEncodingId(encoding) {
}
}
function createWriteReq ( handle , data , encoding ) {
function createWriteReq ( req , handle , data , encoding ) {
switch ( encoding ) {
case 'buffer' :
return handle . writeBuffer ( data ) ;
return handle . writeBuffer ( req , data ) ;
case 'utf8' :
case 'utf-8' :
return handle . writeUtf8String ( data ) ;
return handle . writeUtf8String ( req , data ) ;
case 'ascii' :
return handle . writeAsciiString ( data ) ;
return handle . writeAsciiString ( req , data ) ;
case 'ucs2' :
case 'ucs-2' :
case 'utf16le' :
case 'utf-16le' :
return handle . writeUcs2String ( data ) ;
return handle . writeUcs2String ( req , data ) ;
default :
return handle . writeBuffer ( new Buffer ( data , encoding ) ) ;
return handle . writeBuffer ( req , new Buffer ( data , encoding ) ) ;
}
}
@ -758,9 +761,10 @@ function afterWrite(status, handle, req) {
return ;
}
if ( status ) {
debug ( 'write failure' , errnoException ( process . _ errno , 'write' ) ) ;
self . _ destroy ( errnoException ( process . _ errno , 'write' ) , req . cb ) ;
if ( status < 0 ) {
var ex = errnoException ( status , 'write' ) ;
debug ( 'write failure' , ex ) ;
self . _ destroy ( ex , req . cb ) ;
return ;
}
@ -780,33 +784,31 @@ function connect(self, address, port, addressType, localAddress) {
assert . ok ( self . _ connecting ) ;
var err ;
if ( localAddress ) {
var r ;
if ( addressType == 6 ) {
r = self . _ handle . bind6 ( localAddress ) ;
er r = self . _ handle . bind6 ( localAddress ) ;
} else {
r = self . _ handle . bind ( localAddress ) ;
er r = self . _ handle . bind ( localAddress ) ;
}
if ( r ) {
self . _ destroy ( errnoException ( process . _ errno , 'bind' ) ) ;
if ( er r) {
self . _ destroy ( errnoException ( err , 'bind' ) ) ;
return ;
}
}
var connectReq ;
var req = { oncomplete : afterConnect } ;
if ( addressType == 6 ) {
connectReq = self . _ handle . connect6 ( address , port ) ;
err = self . _ handle . connect6 ( req , address , port ) ;
} else if ( addressType == 4 ) {
connectReq = self . _ handle . connect ( address , port ) ;
err = self . _ handle . connect ( req , address , port ) ;
} else {
connectReq = self . _ handle . connect ( address , afterConnect ) ;
err = self . _ handle . connect ( req , address , afterConnect ) ;
}
if ( connectReq !== null ) {
connectReq . oncomplete = afterConnect ;
} else {
self . _ destroy ( errnoException ( process . _ errno , 'connect' ) ) ;
if ( err ) {
self . _ destroy ( errnoException ( err , 'connect' ) ) ;
}
}
@ -937,7 +939,7 @@ function afterConnect(status, handle, req, readable, writable) {
} else {
self . _ connecting = false ;
self . _ destroy ( errnoException ( process . _ errno , 'connect' ) ) ;
self . _ destroy ( errnoException ( status , 'connect' ) ) ;
}
}
@ -992,7 +994,7 @@ function toNumber(x) { return (x = Number(x)) >= 0 ? x : false; }
var createServerHandle = exports . _ createServerHandle =
function ( address , port , addressType , fd ) {
var r = 0 ;
var er r = 0 ;
// assign handle in listen, and clean up if bind or listen fails
var handle ;
@ -1003,8 +1005,7 @@ var createServerHandle = exports._createServerHandle =
catch ( e ) {
// Not a fd we can listen on. This will trigger an error.
debug ( 'listen invalid fd=' + fd + ': ' + e . message ) ;
process . _ errno = 'EINVAL' ; // hack, callers expect that errno is set
return null ;
return uv . UV_EINVAL ;
}
handle . open ( fd ) ;
handle . readable = true ;
@ -1026,15 +1027,15 @@ var createServerHandle = exports._createServerHandle =
if ( address || port ) {
debug ( 'bind to ' + address ) ;
if ( addressType == 6 ) {
r = handle . bind6 ( address , port ) ;
er r = handle . bind6 ( address , port ) ;
} else {
r = handle . bind ( address , port ) ;
er r = handle . bind ( address , port ) ;
}
}
if ( r ) {
if ( er r) {
handle . close ( ) ;
handle = null ;
return err ;
}
return handle ;
@ -1044,20 +1045,20 @@ var createServerHandle = exports._createServerHandle =
Server . prototype . _ listen2 = function ( address , port , addressType , backlog , fd ) {
debug ( 'listen2' , address , port , addressType , backlog ) ;
var self = this ;
var r = 0 ;
// If there is not yet a handle, we need to create one and bind.
// In the case of a server sent via IPC, we don't need to do this.
if ( ! self . _ handle ) {
debug ( '_listen2: create a handle' ) ;
self . _ handle = createServerHandle ( address , port , addressType , fd ) ;
if ( ! self . _ handle ) {
var error = errnoException ( process . _ errno , 'listen' ) ;
var rval = createServerHandle ( address , port , addressType , fd ) ;
if ( typeof rval === 'number' ) {
var error = errnoException ( rval , 'listen' ) ;
process . nextTick ( function ( ) {
self . emit ( 'error' , error ) ;
} ) ;
return ;
}
self . _ handle = rval ;
} else {
debug ( '_listen2: have a handle already' ) ;
}
@ -1068,10 +1069,10 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
// Use a backlog of 512 entries. We pass 511 to the listen() call because
// the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
// which will thus give us a backlog of 512 entries.
r = self . _ handle . listen ( backlog || 511 ) ;
var er r = self . _ handle . listen ( backlog || 511 ) ;
if ( r ) {
var ex = errnoException ( process . _ errno , 'listen' ) ;
if ( er r) {
var ex = errnoException ( err , 'listen' ) ;
self . _ handle . close ( ) ;
self . _ handle = null ;
process . nextTick ( function ( ) {
@ -1104,9 +1105,15 @@ function listen(self, address, port, addressType, backlog, fd) {
// not actually bound. That's why we check if the actual port matches what
// we requested and if not, raise an error. The exception is when port == 0
// because that means "any random port".
if ( port && handle . getsockname && port != handle . getsockname ( ) . port ) {
self . emit ( 'error' , errnoException ( 'EADDRINUSE' , 'bind' ) ) ;
return ;
if ( port && handle . getsockname ) {
var out = { } ;
var err = handle . getsockname ( out ) ;
if ( err === 0 && port !== out . port ) {
err = uv . UV_EADDRINUSE ;
}
if ( err ) {
return self . emit ( 'error' , errnoException ( err , 'bind' ) ) ;
}
}
self . _ handle = handle ;
@ -1176,7 +1183,10 @@ Server.prototype.listen = function() {
Server . prototype . address = function ( ) {
if ( this . _ handle && this . _ handle . getsockname ) {
return this . _ handle . getsockname ( ) ;
var out = { } ;
var err = this . _ handle . getsockname ( out ) ;
// TODO(bnoordhuis) Check err and throw?
return out ;
} else if ( this . _ pipeName ) {
return this . _ pipeName ;
} else {
@ -1184,14 +1194,14 @@ Server.prototype.address = function() {
}
} ;
function onconnection ( clientHandle ) {
function onconnection ( err , clientHandle ) {
var handle = this ;
var self = handle . owner ;
debug ( 'onconnection' ) ;
if ( ! clientHandle ) {
self . emit ( 'error' , errnoException ( process . _ errno , 'accept' ) ) ;
if ( err ) {
self . emit ( 'error' , errnoException ( err , 'accept' ) ) ;
return ;
}