Browse Source

inspector: reduce implementation in header

This is needed to reduce the coupling between node files that use
node::Environment and inspector class.

Fixes: https://github.com/nodejs/node/issues/7080
PR-URL: https://github.com/nodejs/node/pull/7228
Reviewed-By: bnoordhuis - Ben Noordhuis <info@bnoordhuis.nl>

 Conflicts:
	src/node.cc
v6.x
Eugene Ostroukhov 9 years ago
committed by Jeremiah Senkpiel
parent
commit
7da8a413f6
  1. 203
      src/inspector_agent.cc
  2. 71
      src/inspector_agent.h
  3. 2
      src/node.cc

203
src/inspector_agent.cc

@ -1,8 +1,9 @@
#include "inspector_agent.h"
#include "node.h"
#include "inspector_socket.h"
#include "env.h"
#include "env-inl.h"
#include "node.h"
#include "node_version.h"
#include "v8-platform.h"
#include "util.h"
@ -15,6 +16,7 @@
#include "libplatform/libplatform.h"
#include <string.h>
#include <vector>
// We need pid to use as ID with Chrome
#if defined(_MSC_VER)
@ -150,25 +152,89 @@ namespace inspector {
using blink::protocol::DictionaryValue;
using blink::protocol::String16;
class AgentImpl {
public:
explicit AgentImpl(node::Environment* env);
~AgentImpl();
// Start the inspector agent thread
void Start(v8::Platform* platform, int port, bool wait);
// Stop the inspector agent
void Stop();
bool IsStarted();
bool IsConnected() { return connected_; }
void WaitForDisconnect();
private:
static void ThreadCbIO(void* agent);
static void OnSocketConnectionIO(uv_stream_t* server, int status);
static bool OnInspectorHandshakeIO(inspector_socket_t* socket,
enum inspector_handshake_event state,
const char* path);
static void OnRemoteDataIO(uv_stream_t* stream, ssize_t read,
const uv_buf_t* b);
static void WriteCbIO(uv_async_t* async);
void WorkerRunIO();
void OnInspectorConnectionIO(inspector_socket_t* socket);
void PushPendingMessage(std::vector<std::string>* queue,
const std::string& message);
void SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
std::vector<std::string>* output);
void PostMessages();
void SetConnected(bool connected);
void Write(const std::string& message);
uv_sem_t start_sem_;
uv_cond_t pause_cond_;
uv_mutex_t queue_lock_;
uv_mutex_t pause_lock_;
uv_thread_t thread_;
uv_loop_t child_loop_;
uv_tcp_t server_;
int port_;
bool wait_;
bool connected_;
bool shutting_down_;
node::Environment* parent_env_;
uv_async_t data_written_;
uv_async_t io_thread_req_;
inspector_socket_t* client_socket_;
blink::V8Inspector* inspector_;
v8::Platform* platform_;
std::vector<std::string> message_queue_;
std::vector<std::string> outgoing_message_queue_;
bool dispatching_messages_;
friend class ChannelImpl;
friend class DispatchOnInspectorBackendTask;
friend class SetConnectedTask;
friend class V8NodeInspector;
friend void InterruptCallback(v8::Isolate*, void* agent);
};
void InterruptCallback(v8::Isolate*, void* agent) {
static_cast<Agent*>(agent)->PostMessages();
static_cast<AgentImpl*>(agent)->PostMessages();
}
class DispatchOnInspectorBackendTask : public v8::Task {
public:
explicit DispatchOnInspectorBackendTask(Agent* agent) : agent_(agent) {}
explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {}
void Run() override {
agent_->PostMessages();
}
private:
Agent* agent_;
AgentImpl* agent_;
};
class ChannelImpl final : public blink::protocol::FrontendChannel {
public:
explicit ChannelImpl(Agent* agent): agent_(agent) {}
explicit ChannelImpl(AgentImpl* agent): agent_(agent) {}
virtual ~ChannelImpl() {}
private:
virtual void sendProtocolResponse(int sessionId, int callId,
@ -188,12 +254,12 @@ class ChannelImpl final : public blink::protocol::FrontendChannel {
agent_->Write(message->toJSONString().utf8());
}
Agent* const agent_;
AgentImpl* const agent_;
};
class SetConnectedTask : public v8::Task {
public:
SetConnectedTask(Agent* agent, bool connected)
SetConnectedTask(AgentImpl* agent, bool connected)
: agent_(agent),
connected_(connected) {}
@ -202,19 +268,20 @@ class SetConnectedTask : public v8::Task {
}
private:
Agent* agent_;
AgentImpl* agent_;
bool connected_;
};
class V8NodeInspector : public blink::V8Inspector {
public:
V8NodeInspector(Agent* agent, node::Environment* env, v8::Platform* platform)
: blink::V8Inspector(env->isolate(), env->context()),
agent_(agent),
isolate_(env->isolate()),
platform_(platform),
terminated_(false),
running_nested_loop_(false) {}
V8NodeInspector(AgentImpl* agent, node::Environment* env,
v8::Platform* platform)
: blink::V8Inspector(env->isolate(), env->context()),
agent_(agent),
isolate_(env->isolate()),
platform_(platform),
terminated_(false),
running_nested_loop_(false) {}
void runMessageLoopOnPause(int context_group_id) override {
if (running_nested_loop_)
@ -237,28 +304,28 @@ class V8NodeInspector : public blink::V8Inspector {
}
private:
Agent* agent_;
AgentImpl* agent_;
v8::Isolate* isolate_;
v8::Platform* platform_;
bool terminated_;
bool running_nested_loop_;
};
Agent::Agent(Environment* env) : port_(0),
wait_(false),
connected_(false),
shutting_down_(false),
parent_env_(env),
client_socket_(nullptr),
inspector_(nullptr),
platform_(nullptr),
dispatching_messages_(false) {
AgentImpl::AgentImpl(Environment* env) : port_(0),
wait_(false),
connected_(false),
shutting_down_(false),
parent_env_(env),
client_socket_(nullptr),
inspector_(nullptr),
platform_(nullptr),
dispatching_messages_(false) {
int err;
err = uv_sem_init(&start_sem_, 0);
CHECK_EQ(err, 0);
}
Agent::~Agent() {
AgentImpl::~AgentImpl() {
if (!inspector_)
return;
uv_mutex_destroy(&queue_lock_);
@ -267,7 +334,7 @@ Agent::~Agent() {
uv_close(reinterpret_cast<uv_handle_t*>(&data_written_), nullptr);
}
void Agent::Start(v8::Platform* platform, int port, bool wait) {
void AgentImpl::Start(v8::Platform* platform, int port, bool wait) {
auto env = parent_env_;
inspector_ = new V8NodeInspector(this, env, platform);
@ -291,7 +358,7 @@ void Agent::Start(v8::Platform* platform, int port, bool wait) {
port_ = port;
wait_ = wait;
err = uv_thread_create(&thread_, Agent::ThreadCbIO, this);
err = uv_thread_create(&thread_, AgentImpl::ThreadCbIO, this);
CHECK_EQ(err, 0);
uv_sem_wait(&start_sem_);
@ -303,7 +370,7 @@ void Agent::Start(v8::Platform* platform, int port, bool wait) {
}
}
void Agent::Stop() {
void AgentImpl::Stop() {
// TODO(repenaxa): hop on the right thread.
DisconnectAndDisposeIO(client_socket_);
int err = uv_thread_join(&thread_);
@ -316,40 +383,41 @@ void Agent::Stop() {
delete inspector_;
}
bool Agent::IsStarted() {
bool AgentImpl::IsStarted() {
return !!platform_;
}
void Agent::WaitForDisconnect() {
void AgentImpl::WaitForDisconnect() {
shutting_down_ = true;
fprintf(stderr, "Waiting for the debugger to disconnect...\n");
inspector_->runMessageLoopOnPause(0);
}
// static
void Agent::ThreadCbIO(void* agent) {
static_cast<Agent*>(agent)->WorkerRunIO();
void AgentImpl::ThreadCbIO(void* agent) {
static_cast<AgentImpl*>(agent)->WorkerRunIO();
}
// static
void Agent::OnSocketConnectionIO(uv_stream_t* server, int status) {
void AgentImpl::OnSocketConnectionIO(uv_stream_t* server, int status) {
if (status == 0) {
inspector_socket_t* socket =
static_cast<inspector_socket_t*>(malloc(sizeof(*socket)));
ASSERT_NE(nullptr, socket);
memset(socket, 0, sizeof(*socket));
socket->data = server->data;
if (inspector_accept(server, socket, Agent::OnInspectorHandshakeIO) != 0) {
if (inspector_accept(server, socket,
AgentImpl::OnInspectorHandshakeIO) != 0) {
free(socket);
}
}
}
// static
bool Agent::OnInspectorHandshakeIO(inspector_socket_t* socket,
bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket,
enum inspector_handshake_event state,
const char* path) {
Agent* agent = static_cast<Agent*>(socket->data);
AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
switch (state) {
case kInspectorHandshakeHttpGet:
return RespondToGet(socket, path, agent->port_);
@ -366,11 +434,11 @@ bool Agent::OnInspectorHandshakeIO(inspector_socket_t* socket,
}
// static
void Agent::OnRemoteDataIO(uv_stream_t* stream,
void AgentImpl::OnRemoteDataIO(uv_stream_t* stream,
ssize_t read,
const uv_buf_t* b) {
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
Agent* agent = static_cast<Agent*>(socket->data);
AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
if (read > 0) {
std::string str(b->base, read);
agent->PushPendingMessage(&agent->message_queue_, str);
@ -407,14 +475,14 @@ void Agent::OnRemoteDataIO(uv_stream_t* stream,
uv_cond_broadcast(&agent->pause_cond_);
}
void Agent::PushPendingMessage(std::vector<std::string>* queue,
void AgentImpl::PushPendingMessage(std::vector<std::string>* queue,
const std::string& message) {
uv_mutex_lock(&queue_lock_);
queue->push_back(message);
uv_mutex_unlock(&queue_lock_);
}
void Agent::SwapBehindLock(std::vector<std::string> Agent::*queue,
void AgentImpl::SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
std::vector<std::string>* output) {
uv_mutex_lock(&queue_lock_);
(this->*queue).swap(*output);
@ -422,21 +490,22 @@ void Agent::SwapBehindLock(std::vector<std::string> Agent::*queue,
}
// static
void Agent::WriteCbIO(uv_async_t* async) {
Agent* agent = static_cast<Agent*>(async->data);
void AgentImpl::WriteCbIO(uv_async_t* async) {
AgentImpl* agent = static_cast<AgentImpl*>(async->data);
inspector_socket_t* socket = agent->client_socket_;
if (socket) {
std::vector<std::string> outgoing_messages;
agent->SwapBehindLock(&Agent::outgoing_message_queue_, &outgoing_messages);
agent->SwapBehindLock(&AgentImpl::outgoing_message_queue_,
&outgoing_messages);
for (auto const& message : outgoing_messages)
inspector_write(socket, message.c_str(), message.length());
}
}
void Agent::WorkerRunIO() {
void AgentImpl::WorkerRunIO() {
sockaddr_in addr;
uv_tcp_t server;
int err = uv_async_init(&child_loop_, &io_thread_req_, Agent::WriteCbIO);
int err = uv_async_init(&child_loop_, &io_thread_req_, AgentImpl::WriteCbIO);
CHECK_EQ(0, err);
io_thread_req_.data = this;
uv_tcp_init(&child_loop_, &server);
@ -463,22 +532,22 @@ void Agent::WorkerRunIO() {
uv_run(&child_loop_, UV_RUN_DEFAULT);
}
void Agent::OnInspectorConnectionIO(inspector_socket_t* socket) {
void AgentImpl::OnInspectorConnectionIO(inspector_socket_t* socket) {
if (client_socket_) {
return;
}
client_socket_ = socket;
inspector_read_start(socket, OnBufferAlloc, Agent::OnRemoteDataIO);
inspector_read_start(socket, OnBufferAlloc, AgentImpl::OnRemoteDataIO);
platform_->CallOnForegroundThread(parent_env_->isolate(),
new SetConnectedTask(this, true));
}
void Agent::PostMessages() {
void AgentImpl::PostMessages() {
if (dispatching_messages_)
return;
dispatching_messages_ = true;
std::vector<std::string> messages;
SwapBehindLock(&Agent::message_queue_, &messages);
SwapBehindLock(&AgentImpl::message_queue_, &messages);
for (auto const& message : messages)
inspector_->dispatchMessageFromFrontend(
String16::fromUTF8(message.c_str(), message.length()));
@ -486,7 +555,7 @@ void Agent::PostMessages() {
dispatching_messages_ = false;
}
void Agent::SetConnected(bool connected) {
void AgentImpl::SetConnected(bool connected) {
if (connected_ == connected)
return;
@ -502,9 +571,37 @@ void Agent::SetConnected(bool connected) {
}
}
void Agent::Write(const std::string& message) {
void AgentImpl::Write(const std::string& message) {
PushPendingMessage(&outgoing_message_queue_, message);
ASSERT_EQ(0, uv_async_send(&io_thread_req_));
}
} // namespace debugger
// Exported class Agent
Agent::Agent(node::Environment* env) : impl(new AgentImpl(env)) {}
Agent::~Agent() {
delete impl;
}
void Agent::Start(v8::Platform* platform, int port, bool wait) {
impl->Start(platform, port, wait);
};
void Agent::Stop() {
impl->Stop();
};
bool Agent::IsStarted() {
return impl->IsStarted();
};
bool Agent::IsConnected() {
return impl->IsConnected();
};
void Agent::WaitForDisconnect() {
impl->WaitForDisconnect();
};
} // namespace inspector
} // namespace node

71
src/inspector_agent.h

@ -5,27 +5,19 @@
#error("This header can only be used when inspector is enabled")
#endif
#include "inspector_socket.h"
#include "uv.h"
#include "v8.h"
#include "util.h"
#include <string>
#include <vector>
namespace blink {
class V8Inspector;
}
// Forward declaration to break recursive dependency chain with src/env.h.
namespace node {
class Environment;
} // namespace node
namespace v8 {
class Platform;
} // namespace v8
namespace node {
namespace inspector {
class ChannelImpl;
class AgentImpl;
class Agent {
public:
@ -38,57 +30,10 @@ class Agent {
void Stop();
bool IsStarted();
bool connected() { return connected_; }
bool IsConnected();
void WaitForDisconnect();
protected:
static void ThreadCbIO(void* agent);
static void OnSocketConnectionIO(uv_stream_t* server, int status);
static bool OnInspectorHandshakeIO(inspector_socket_t* socket,
enum inspector_handshake_event state,
const char* path);
static void OnRemoteDataIO(uv_stream_t* stream, ssize_t read,
const uv_buf_t* b);
static void WriteCbIO(uv_async_t* async);
void WorkerRunIO();
void OnInspectorConnectionIO(inspector_socket_t* socket);
void PushPendingMessage(std::vector<std::string>* queue,
const std::string& message);
void SwapBehindLock(std::vector<std::string> Agent::*queue,
std::vector<std::string>* output);
void PostMessages();
void SetConnected(bool connected);
void Write(const std::string& message);
uv_sem_t start_sem_;
uv_cond_t pause_cond_;
uv_mutex_t queue_lock_;
uv_mutex_t pause_lock_;
uv_thread_t thread_;
uv_loop_t child_loop_;
uv_tcp_t server_;
int port_;
bool wait_;
bool connected_;
bool shutting_down_;
node::Environment* parent_env_;
uv_async_t data_written_;
uv_async_t io_thread_req_;
inspector_socket_t* client_socket_;
blink::V8Inspector* inspector_;
v8::Platform* platform_;
std::vector<std::string> message_queue_;
std::vector<std::string> outgoing_message_queue_;
bool dispatching_messages_;
friend class ChannelImpl;
friend class DispatchOnInspectorBackendTask;
friend class SetConnectedTask;
friend class V8NodeInspector;
friend void InterruptCallback(v8::Isolate*, void* agent);
private:
AgentImpl* impl;
};
} // namespace inspector

2
src/node.cc

@ -4467,7 +4467,7 @@ static void StartNodeInstance(void* arg) {
RunAtExit(env);
#if HAVE_INSPECTOR
if (env->inspector_agent()->connected()) {
if (env->inspector_agent()->IsConnected()) {
// Restore signal dispositions, the app is done and is no longer
// capable of handling signals.
#ifdef __POSIX__

Loading…
Cancel
Save