@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
return skipBody ;
return skipBody ;
}
}
// XXX This is a mess.
// TODO: http.Parser should be a Writable emits request/response events.
function parserOnBody ( b , start , len ) {
function parserOnBody ( b , start , len ) {
var parser = this ;
var parser = this ;
var slice = b . slice ( start , start + len ) ;
var stream = parser . incoming ;
if ( parser . incoming . _ paused || parser . incoming . _ pendings . length ) {
var rs = stream . _ readableState ;
parser . incoming . _ pendings . push ( slice ) ;
var socket = stream . socket ;
} else {
parser . incoming . _ emitData ( slice ) ;
// pretend this was the result of a stream._read call.
if ( len > 0 ) {
var slice = b . slice ( start , start + len ) ;
rs . onread ( null , slice ) ;
}
}
if ( rs . length >= rs . highWaterMark )
socket . pause ( ) ;
}
}
function parserOnMessageComplete ( ) {
function parserOnMessageComplete ( ) {
var parser = this ;
var parser = this ;
parser . incoming . complete = true ;
var stream = parser . incoming ;
var socket = stream . socket ;
stream . complete = true ;
// Emit any trailing headers.
// Emit any trailing headers.
var headers = parser . _ headers ;
var headers = parser . _ headers ;
@ -140,19 +151,13 @@ function parserOnMessageComplete() {
parser . _ url = '' ;
parser . _ url = '' ;
}
}
if ( ! parser . incoming . upgrade ) {
if ( ! stream . upgrade )
// For upgraded connections, also emit this after parser.execute
// For upgraded connections, also emit this after parser.execute
if ( parser . incoming . _ paused || parser . incoming . _ pendings . length ) {
stream . _ readableState . onread ( null , null ) ;
parser . incoming . _ pendings . push ( END_OF_FILE ) ;
} else {
parser . incoming . readable = false ;
parser . incoming . _ emitEnd ( ) ;
}
}
if ( parser . socket . readable ) {
if ( parser . socket . readable ) {
// force to read the next incoming message
// force to read the next incoming message
parser . socket . resume ( ) ;
socket . resume ( ) ;
}
}
}
}
@ -263,9 +268,13 @@ function utcDate() {
/* Abstract base class for ServerRequest and ClientResponse. */
/* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage ( socket ) {
function IncomingMessage ( socket ) {
Stream . call ( this ) ;
Stream . Readable . call ( this ) ;
// XXX This implementation is kind of all over the place
// When the parser emits body chunks, they go in this list.
// _read() pulls them out, and when it finds EOF, it ends.
this . _ pendings = [ ] ;
// TODO Remove one of these eventually.
this . socket = socket ;
this . socket = socket ;
this . connection = socket ;
this . connection = socket ;
@ -276,77 +285,49 @@ function IncomingMessage(socket) {
this . readable = true ;
this . readable = true ;
this . _ paused = false ;
this . _ pendings = [ ] ;
this . _ pendings = [ ] ;
this . _ pendingIndex = 0 ;
this . _ endEmitted = false ;
// request (server) only
// request (server) only
this . url = '' ;
this . url = '' ;
this . method = null ;
this . method = null ;
// response (client) only
// response (client) only
this . statusCode = null ;
this . statusCode = null ;
this . client = this . socket ;
this . client = this . socket ;
// flag for backwards compatibility grossness.
this . _ consuming = false ;
}
}
util . inherits ( IncomingMessage , Stream ) ;
util . inherits ( IncomingMessage , Stream . Readable ) ;
exports . IncomingMessage = IncomingMessage ;
exports . IncomingMessage = IncomingMessage ;
IncomingMessage . prototype . destroy = function ( error ) {
IncomingMessage . prototype . read = function ( n ) {
this . socket . destroy ( error ) ;
this . _ consuming = true ;
return Stream . Readable . prototype . read . call ( this , n ) ;
} ;
} ;
IncomingMessage . prototype . setEncoding = function ( encoding ) {
IncomingMessage . prototype . _ read = function ( n , callback ) {
var StringDecoder = require ( 'string_decoder' ) . StringDecoder ; // lazy load
// We actually do almost nothing here, because the parserOnBody
this . _ decoder = new StringDecoder ( encoding ) ;
// function fills up our internal buffer directly. However, we
} ;
// do need to unpause the underlying socket so that it flows.
if ( ! this . socket . readable )
return callback ( null , null ) ;
IncomingMessage . prototype . pause = function ( ) {
else
this . _ paused = true ;
this . socket . resume ( ) ;
this . socket . pause ( ) ;
} ;
} ;
IncomingMessage . prototype . resume = function ( ) {
IncomingMessage . prototype . destroy = function ( error ) {
this . _ paused = false ;
this . socket . destroy ( error ) ;
if ( this . socket ) {
this . socket . resume ( ) ;
}
this . _ emitPending ( ) ;
} ;
} ;
IncomingMessage . prototype . _ emitPending = function ( callback ) {
if ( this . _ pendings . length ) {
var self = this ;
process . nextTick ( function ( ) {
while ( ! self . _ paused && self . _ pendings . length ) {
var chunk = self . _ pendings . shift ( ) ;
if ( chunk !== END_OF_FILE ) {
assert ( Buffer . isBuffer ( chunk ) ) ;
self . _ emitData ( chunk ) ;
} else {
assert ( self . _ pendings . length === 0 ) ;
self . readable = false ;
self . _ emitEnd ( ) ;
}
}
if ( callback ) {
callback ( ) ;
}
} ) ;
} else if ( callback ) {
callback ( ) ;
}
} ;
IncomingMessage . prototype . _ emitData = function ( d ) {
IncomingMessage . prototype . _ emitData = function ( d ) {
@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {
// don't keep alive connections where the client expects 100 Continue
// don't keep alive connections where the client expects 100 Continue
// but we sent a final status; they may put extra bytes on the wire.
// but we sent a final status; they may put extra bytes on the wire.
if ( this . _ expect_continue && ! this . _ sent100 ) {
if ( this . _ expect_continue && ! this . _ sent100 ) {
this . shouldKeepAlive = false ;
this . shouldKeepAlive = false ;
}
}
@ -1321,11 +1302,10 @@ function socketCloseListener() {
// Socket closed before we emitted 'end' below.
// Socket closed before we emitted 'end' below.
req . res . emit ( 'aborted' ) ;
req . res . emit ( 'aborted' ) ;
var res = req . res ;
var res = req . res ;
req . res . _ emitPending ( function ( ) {
res . on ( 'end' , function ( ) {
res . _ emitEnd ( ) ;
res . emit ( 'close' ) ;
res . emit ( 'close' ) ;
res = null ;
} ) ;
} ) ;
res . _ readableState . onread ( null , null ) ;
} else if ( ! req . res && ! req . _ hadError ) {
} else if ( ! req . res && ! req . _ hadError ) {
// This socket error fired before we started to
// This socket error fired before we started to
// receive a response. The error needs to
// receive a response. The error needs to
@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
}
}
// client
function parserOnIncomingClient ( res , shouldKeepAlive ) {
function parserOnIncomingClient ( res , shouldKeepAlive ) {
var parser = this ;
var parser = this ;
var socket = this . socket ;
var socket = this . socket ;
var req = socket . _ httpMessage ;
var req = socket . _ httpMessage ;
// propogate "domain" setting...
// propogate "domain" setting...
if ( req . domain && ! res . domain ) {
if ( req . domain && ! res . domain ) {
debug ( 'setting "res.domain"' ) ;
debug ( 'setting "res.domain"' ) ;
@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE ( socket , req ) ;
DTRACE_HTTP_CLIENT_RESPONSE ( socket , req ) ;
COUNTER_HTTP_CLIENT_RESPONSE ( ) ;
COUNTER_HTTP_CLIENT_RESPONSE ( ) ;
req . emit ( 'response' , res ) ;
req . res = res ;
req . res = res ;
res . req = req ;
res . req = req ;
var handled = req . emit ( 'response' , res ) ;
res . on ( 'end' , responseOnEnd ) ;
res . on ( 'end' , responseOnEnd ) ;
// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we .resume() it into the void
// so that the socket doesn't hang there in a paused state.
if ( ! handled )
res . resume ( ) ;
return isHeadResponse ;
return isHeadResponse ;
}
}
// client
function responseOnEnd ( ) {
function responseOnEnd ( ) {
var res = this ;
var res = this ;
var req = res . req ;
var req = res . req ;
@ -1784,7 +1772,7 @@ function connectionListener(socket) {
incoming . push ( req ) ;
incoming . push ( req ) ;
var res = new ServerResponse ( req ) ;
var res = new ServerResponse ( req ) ;
debug ( 'server response shouldKeepAlive: ' + shouldKeepAlive ) ;
res . shouldKeepAlive = shouldKeepAlive ;
res . shouldKeepAlive = shouldKeepAlive ;
DTRACE_HTTP_SERVER_REQUEST ( req , socket ) ;
DTRACE_HTTP_SERVER_REQUEST ( req , socket ) ;
COUNTER_HTTP_SERVER_REQUEST ( ) ;
COUNTER_HTTP_SERVER_REQUEST ( ) ;
@ -1806,6 +1794,12 @@ function connectionListener(socket) {
incoming . shift ( ) ;
incoming . shift ( ) ;
// if the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req.resume() so that the
// bytes will be pulled off the wire.
if ( ! req . _ consuming )
req . resume ( ) ;
res . detachSocket ( socket ) ;
res . detachSocket ( socket ) ;
if ( res . _ last ) {
if ( res . _ last ) {