Browse Source

Throwing in a callback should kill the process

There is a difference between errors which happen to a socket - like
receiving EPIPE - an exceptional situation but ultimately okay and the
situation where code throws in a callback - which is not okay.

Fixes test/simple/test-http-exceptions.js

TODO: explain this in docs.
v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
cbd2c3945b
  1. 16
      lib/http.js
  2. 47
      lib/net.js
  3. 8
      src/node.js
  4. 39
      src/node_http_parser.cc
  5. 8
      test/simple/test-http-upgrade2.js

16
lib/http.js

@ -602,8 +602,11 @@ function connectionListener (socket) {
}); });
socket.ondata = function (d, start, end) { socket.ondata = function (d, start, end) {
var bytesParsed = parser.execute(d, start, end - start); var ret = parser.execute(d, start, end - start);
if (parser.incoming && parser.incoming.upgrade) { if (ret instanceof Error) {
socket.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
var bytesParsed = ret;
socket.ondata = null; socket.ondata = null;
socket.onend = null; socket.onend = null;
@ -624,6 +627,7 @@ function connectionListener (socket) {
socket.onend = function () { socket.onend = function () {
parser.finish(); parser.finish();
// unref the parser for easy gc // unref the parser for easy gc
parsers.free(parser); parsers.free(parser);
@ -703,8 +707,11 @@ function Client ( ) {
if (!parser) { if (!parser) {
throw new Error("parser not initialized prior to Client.ondata call"); throw new Error("parser not initialized prior to Client.ondata call");
} }
var bytesParsed = parser.execute(d, start, end - start); var ret = parser.execute(d, start, end - start);
if (parser.incoming && parser.incoming.upgrade) { if (ret instanceof Error) {
self.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
var bytesParsed = ret;
var upgradeHead = d.slice(start + bytesParsed, end - start); var upgradeHead = d.slice(start + bytesParsed, end - start);
parser.incoming.upgradeHead = upgradeHead; parser.incoming.upgradeHead = upgradeHead;
currentRequest.emit("response", parser.incoming); currentRequest.emit("response", parser.incoming);
@ -734,7 +741,6 @@ function Client ( ) {
self.onend = function () { self.onend = function () {
parser.finish(); parser.finish();
debug("self got end closing. readyState = " + self.readyState); debug("self got end closing. readyState = " + self.readyState);
self.end(); self.end();
}; };

47
lib/net.js

@ -266,13 +266,8 @@ function _doFlush () {
// Stream becomes writeable on connect() but don't flush if there's // Stream becomes writeable on connect() but don't flush if there's
// nothing actually to write // nothing actually to write
if (socket.flush()) { if (socket.flush()) {
try { if (socket._events && socket._events['drain']) socket.emit("drain");
if (socket._events && socket._events['drain']) socket.emit("drain"); if (socket.ondrain) socket.ondrain(); // Optimization
if (socket.ondrain) socket.ondrain(); // Optimization
} catch (e) {
socket.destroy(e);
return;
}
} }
} }
@ -358,13 +353,8 @@ function initStream (self) {
if (!self.writable) self.destroy(); if (!self.writable) self.destroy();
// Note: 'close' not emitted until nextTick. // Note: 'close' not emitted until nextTick.
try { if (self._events && self._events['end']) self.emit('end');
if (self._events && self._events['end']) self.emit('end'); if (self.onend) self.onend();
if (self.onend) self.onend();
} catch (e) {
self.destroy(e);
return;
}
} else if (bytesRead > 0) { } else if (bytesRead > 0) {
timeout.active(self); timeout.active(self);
@ -373,24 +363,19 @@ function initStream (self) {
var end = pool.used + bytesRead; var end = pool.used + bytesRead;
pool.used += bytesRead; pool.used += bytesRead;
try { if (!self._encoding) {
if (!self._encoding) { if (self._events && self._events['data']) {
if (self._events && self._events['data']) { // emit a slice
// emit a slice self.emit('data', pool.slice(start, end));
self.emit('data', pool.slice(start, end));
}
// Optimization: emit the original buffer with end points
if (self.ondata) self.ondata(pool, start, end);
} else if (this._decoder) {
this._decoder.write(pool.slice(start, end));
} else {
var string = pool.toString(self._encoding, start, end);
self.emit('data', string);
} }
} catch (e) {
self.destroy(e); // Optimization: emit the original buffer with end points
return; if (self.ondata) self.ondata(pool, start, end);
} else if (this._decoder) {
this._decoder.write(pool.slice(start, end));
} else {
var string = pool.toString(self._encoding, start, end);
self.emit('data', string);
} }
} }
}; };

8
src/node.js

@ -169,10 +169,10 @@ var stdin;
process.openStdin = function () { process.openStdin = function () {
if (stdin) return stdin; if (stdin) return stdin;
var binding = process.binding('stdio'), var binding = process.binding('stdio'),
net = module.requireNative('net'), net = module.requireNative('net'),
fs = module.requireNative('fs'), fs = module.requireNative('fs'),
fd = binding.openStdin(); fd = binding.openStdin();
if (binding.isStdinBlocking()) { if (binding.isStdinBlocking()) {
stdin = new net.Stream(fd); stdin = new net.Stream(fd);

39
src/node_http_parser.cc

@ -70,7 +70,12 @@ static struct http_parser_settings settings;
if (!cb_value->IsFunction()) return 0; \ if (!cb_value->IsFunction()) return 0; \
Local<Function> cb = Local<Function>::Cast(cb_value); \ Local<Function> cb = Local<Function>::Cast(cb_value); \
Local<Value> ret = cb->Call(parser->handle_, 0, NULL); \ Local<Value> ret = cb->Call(parser->handle_, 0, NULL); \
return ret.IsEmpty() ? -1 : 0; \ if (ret.IsEmpty()) { \
parser->got_exception_ = true; \
return -1; \
} else { \
return 0; \
} \
} }
// Callback prototype for http_data_cb // Callback prototype for http_data_cb
@ -87,7 +92,12 @@ static struct http_parser_settings settings;
}; \ }; \
Local<Value> ret = cb->Call(parser->handle_, 3, argv); \ Local<Value> ret = cb->Call(parser->handle_, 3, argv); \
assert(parser->buffer_); \ assert(parser->buffer_); \
return ret.IsEmpty() ? -1 : 0; \ if (ret.IsEmpty()) { \
parser->got_exception_ = true; \
return -1; \
} else { \
return 0; \
} \
} }
@ -169,7 +179,12 @@ class Parser : public ObjectWrap {
Local<Value> ret = cb->Call(parser->handle_, 1, argv); Local<Value> ret = cb->Call(parser->handle_, 1, argv);
return ret.IsEmpty() ? -1 : 0; if (ret.IsEmpty()) {
parser->got_exception_ = true;
return -1;
} else {
return 0;
}
} }
static Handle<Value> New(const Arguments& args) { static Handle<Value> New(const Arguments& args) {
@ -225,10 +240,9 @@ class Parser : public ObjectWrap {
String::New("Length is extends beyond buffer"))); String::New("Length is extends beyond buffer")));
} }
TryCatch try_catch;
// Assign 'buffer_' while we parse. The callbacks will access that varible. // Assign 'buffer_' while we parse. The callbacks will access that varible.
parser->buffer_ = buffer; parser->buffer_ = buffer;
parser->got_exception_ = false;
size_t nparsed = size_t nparsed =
http_parser_execute(&parser->parser_, settings, buffer->data()+off, len); http_parser_execute(&parser->parser_, settings, buffer->data()+off, len);
@ -238,20 +252,19 @@ class Parser : public ObjectWrap {
parser->buffer_ = NULL; parser->buffer_ = NULL;
// If there was an exception in one of the callbacks // If there was an exception in one of the callbacks
if (try_catch.HasCaught()) return try_catch.ReThrow(); if (parser->got_exception_) return Local<Value>();
Local<Integer> nparsed_obj = Integer::New(nparsed); Local<Integer> nparsed_obj = Integer::New(nparsed);
// If there was a parse error in one of the callbacks // If there was a parse error in one of the callbacks
// TODO What if there is an error on EOF? // TODO What if there is an error on EOF?
if (!parser->parser_.upgrade && nparsed != len) { if (!parser->parser_.upgrade && nparsed != len) {
Local<Value> e = Exception::Error(String::New("Parse Error")); Local<Value> e = Exception::Error(String::NewSymbol("Parse Error"));
Local<Object> obj = e->ToObject(); Local<Object> obj = e->ToObject();
obj->Set(String::NewSymbol("bytesParsed"), nparsed_obj); obj->Set(String::NewSymbol("bytesParsed"), nparsed_obj);
return ThrowException(e); return scope.Close(e);
} else {
return scope.Close(nparsed_obj);
} }
assert(!parser->buffer_);
return scope.Close(nparsed_obj);
} }
static Handle<Value> Finish(const Arguments& args) { static Handle<Value> Finish(const Arguments& args) {
@ -260,9 +273,12 @@ class Parser : public ObjectWrap {
Parser *parser = ObjectWrap::Unwrap<Parser>(args.This()); Parser *parser = ObjectWrap::Unwrap<Parser>(args.This());
assert(!parser->buffer_); assert(!parser->buffer_);
parser->got_exception_ = false;
http_parser_execute(&(parser->parser_), settings, NULL, 0); http_parser_execute(&(parser->parser_), settings, NULL, 0);
if (parser->got_exception_) return Local<Value>();
return Undefined(); return Undefined();
} }
@ -294,6 +310,7 @@ class Parser : public ObjectWrap {
} }
Buffer * buffer_; // The buffer currently being parsed. Buffer * buffer_; // The buffer currently being parsed.
bool got_exception_;
http_parser parser_; http_parser parser_;
}; };

8
test/simple/test-http-upgrade2.js

@ -10,19 +10,21 @@ server = http.createServer(function (req, res) {
server.addListener('upgrade', function (req, socket, upgradeHead) { server.addListener('upgrade', function (req, socket, upgradeHead) {
error('got upgrade event'); error('got upgrade event');
// test that throwing an error from upgrade gets forworded // test that throwing an error from upgrade gets
// to the server'server 'error' event. // is uncaught
throw new Error('upgrade error'); throw new Error('upgrade error');
}); });
gotError = false; gotError = false;
server.addListener('clientError', function (e) { process.addListener('uncaughtException', function (e) {
error('got "clientError" event'); error('got "clientError" event');
assert.equal('upgrade error', e.message); assert.equal('upgrade error', e.message);
gotError = true; gotError = true;
process.exit(0);
}); });
server.listen(PORT); server.listen(PORT);

Loading…
Cancel
Save