Browse Source

inspector: address race conditions

Stress tests uncovered 2 race conditions, when IO events happened during
V8 entering event loop on pause or during Node.js shutdown.

Fixes: nodejs/node#8669
PR-URL: https://github.com/nodejs/node/pull/8672
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Aleksey Kozyatinskiy <kozyatinskiy@chromium.org>
v7.x
Eugene Ostroukhov 8 years ago
committed by James M Snell
parent
commit
7f80ce4f6c
  1. 35
      src/inspector_agent.cc
  2. 16
      test/inspector/inspector-helper.js

35
src/inspector_agent.cc

@ -265,12 +265,13 @@ class AgentImpl {
const String16& message); const String16& message);
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2); void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2);
void PostIncomingMessage(const String16& message); void PostIncomingMessage(const String16& message);
void WaitForFrontendMessage();
void NotifyMessageReceived();
State ToState(State state); State ToState(State state);
uv_sem_t start_sem_; uv_sem_t start_sem_;
ConditionVariable pause_cond_; ConditionVariable incoming_message_cond_;
Mutex pause_lock_; Mutex state_lock_;
Mutex queue_lock_;
uv_thread_t thread_; uv_thread_t thread_;
uv_loop_t child_loop_; uv_loop_t child_loop_;
@ -370,15 +371,11 @@ class V8NodeInspector : public v8_inspector::V8InspectorClient {
return; return;
terminated_ = false; terminated_ = false;
running_nested_loop_ = true; running_nested_loop_ = true;
agent_->DispatchMessages(); while (!terminated_) {
do { agent_->WaitForFrontendMessage();
{
Mutex::ScopedLock scoped_lock(agent_->pause_lock_);
agent_->pause_cond_.Wait(scoped_lock);
}
while (v8::platform::PumpMessageLoop(platform_, env_->isolate())) while (v8::platform::PumpMessageLoop(platform_, env_->isolate()))
{} {}
} while (!terminated_); }
terminated_ = false; terminated_ = false;
running_nested_loop_ = false; running_nested_loop_ = false;
} }
@ -661,7 +658,6 @@ bool AgentImpl::OnInspectorHandshakeIO(InspectorSocket* socket,
void AgentImpl::OnRemoteDataIO(InspectorSocket* socket, void AgentImpl::OnRemoteDataIO(InspectorSocket* socket,
ssize_t read, ssize_t read,
const uv_buf_t* buf) { const uv_buf_t* buf) {
Mutex::ScopedLock scoped_lock(pause_lock_);
if (read > 0) { if (read > 0) {
String16 str = String16::fromUTF8(buf->base, read); String16 str = String16::fromUTF8(buf->base, read);
// TODO(pfeldman): Instead of blocking execution while debugger // TODO(pfeldman): Instead of blocking execution while debugger
@ -686,7 +682,6 @@ void AgentImpl::OnRemoteDataIO(InspectorSocket* socket,
if (buf) { if (buf) {
delete[] buf->base; delete[] buf->base;
} }
pause_cond_.Broadcast(scoped_lock);
} }
// static // static
@ -752,14 +747,14 @@ void AgentImpl::WorkerRunIO() {
bool AgentImpl::AppendMessage(MessageQueue* queue, int session_id, bool AgentImpl::AppendMessage(MessageQueue* queue, int session_id,
const String16& message) { const String16& message) {
Mutex::ScopedLock scoped_lock(queue_lock_); Mutex::ScopedLock scoped_lock(state_lock_);
bool trigger_pumping = queue->empty(); bool trigger_pumping = queue->empty();
queue->push_back(std::make_pair(session_id, message)); queue->push_back(std::make_pair(session_id, message));
return trigger_pumping; return trigger_pumping;
} }
void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) { void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) {
Mutex::ScopedLock scoped_lock(queue_lock_); Mutex::ScopedLock scoped_lock(state_lock_);
vector1->swap(*vector2); vector1->swap(*vector2);
} }
@ -771,6 +766,18 @@ void AgentImpl::PostIncomingMessage(const String16& message) {
isolate->RequestInterrupt(InterruptCallback, this); isolate->RequestInterrupt(InterruptCallback, this);
uv_async_send(data_written_); uv_async_send(data_written_);
} }
NotifyMessageReceived();
}
void AgentImpl::WaitForFrontendMessage() {
Mutex::ScopedLock scoped_lock(state_lock_);
if (incoming_message_queue_.empty())
incoming_message_cond_.Wait(scoped_lock);
}
void AgentImpl::NotifyMessageReceived() {
Mutex::ScopedLock scoped_lock(state_lock_);
incoming_message_cond_.Broadcast(scoped_lock);
} }
void AgentImpl::OnInspectorConnectionIO(InspectorSocket* socket) { void AgentImpl::OnInspectorConnectionIO(InspectorSocket* socket) {

16
test/inspector/inspector-helper.js

@ -6,6 +6,8 @@ const http = require('http');
const path = require('path'); const path = require('path');
const spawn = require('child_process').spawn; const spawn = require('child_process').spawn;
const DEBUG = false;
const TIMEOUT = 15 * 1000; const TIMEOUT = 15 * 1000;
const mainScript = path.join(common.fixturesDir, 'loop.js'); const mainScript = path.join(common.fixturesDir, 'loop.js');
@ -13,6 +15,8 @@ const mainScript = path.join(common.fixturesDir, 'loop.js');
function send(socket, message, id, callback) { function send(socket, message, id, callback) {
const msg = JSON.parse(JSON.stringify(message)); // Clone! const msg = JSON.parse(JSON.stringify(message)); // Clone!
msg['id'] = id; msg['id'] = id;
if (DEBUG)
console.log('[sent]', JSON.stringify(msg));
const messageBuf = Buffer.from(JSON.stringify(msg)); const messageBuf = Buffer.from(JSON.stringify(msg));
const wsHeaderBuf = Buffer.allocUnsafe(16); const wsHeaderBuf = Buffer.allocUnsafe(16);
@ -61,6 +65,8 @@ function parseWSFrame(buffer, handler) {
return 0; return 0;
const message = JSON.parse( const message = JSON.parse(
buffer.slice(bodyOffset, bodyOffset + dataLen).toString('utf8')); buffer.slice(bodyOffset, bodyOffset + dataLen).toString('utf8'));
if (DEBUG)
console.log('[received]', JSON.stringify(message));
handler(message); handler(message);
return bodyOffset + dataLen; return bodyOffset + dataLen;
} }
@ -117,7 +123,7 @@ const TestSession = function(socket, harness) {
this.expectedId_ = 1; this.expectedId_ = 1;
this.lastMessageResponseCallback_ = null; this.lastMessageResponseCallback_ = null;
let buffer = Buffer.from(''); let buffer = new Buffer(0);
socket.on('data', (data) => { socket.on('data', (data) => {
buffer = Buffer.concat([buffer, data]); buffer = Buffer.concat([buffer, data]);
let consumed; let consumed;
@ -195,9 +201,10 @@ TestSession.prototype.sendInspectorCommands = function(commands) {
timeoutId = setTimeout(() => { timeoutId = setTimeout(() => {
let s = ''; let s = '';
for (const id in this.messages_) { for (const id in this.messages_) {
s += this.messages_[id] + '\n'; s += id + ', ';
} }
common.fail(s.substring(0, s.length - 1)); common.fail('Messages without response: ' +
s.substring(0, s.length - 2));
}, TIMEOUT); }, TIMEOUT);
}); });
}); });
@ -269,6 +276,7 @@ TestSession.prototype.disconnect = function(childDone) {
this.harness_.childInstanceDone = this.harness_.childInstanceDone =
this.harness_.childInstanceDone || childDone; this.harness_.childInstanceDone || childDone;
this.socket_.end(); this.socket_.end();
console.log('[test]', 'Connection terminated');
callback(); callback();
}); });
}; };
@ -293,7 +301,7 @@ const Harness = function(port, childProcess) {
if (!filter(message)) pending.push(filter); if (!filter(message)) pending.push(filter);
this.stderrFilters_ = pending; this.stderrFilters_ = pending;
})); }));
childProcess.on('close', (code, signal) => { childProcess.on('exit', (code, signal) => {
assert(this.childInstanceDone, 'Child instance died prematurely'); assert(this.childInstanceDone, 'Child instance died prematurely');
this.returnCode_ = code; this.returnCode_ = code;
this.running_ = false; this.running_ = false;

Loading…
Cancel
Save