Browse Source

inspector: split HTTP/WS server from the inspector

Both our team experiments and some embedder request indicate a potential
in implementing alternative transport for inspector - e.g. IPC pipes or
custom embedder APIs. This change moves all HTTP specific code into a
separate class and is a first attempt at defining a boundary between the
inspector agent and transport. This API will be refined as new
transports are implemented.
Note that even without considering alternative transports, this change
enables better testing of the HTTP server (Valgrind made it possible to
identify and fix some existing memory leaks).

PR-URL: https://github.com/nodejs/node/pull/9630
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
v6
Eugene Ostroukhov 8 years ago
parent
commit
42da740ed8
  1. 21
      node.gyp
  2. 418
      src/inspector_agent.cc
  3. 3
      src/inspector_agent.h
  4. 471
      src/inspector_socket_server.cc
  5. 77
      src/inspector_socket_server.h
  6. 517
      test/cctest/test_inspector_socket_server.cc

21
node.gyp

@ -318,8 +318,10 @@
'sources': [
'src/inspector_agent.cc',
'src/inspector_socket.cc',
'src/inspector_socket.h',
'src/inspector_socket_server.cc',
'src/inspector_agent.h',
'src/inspector_socket.h',
'src/inspector_socket_server.h',
],
'dependencies': [
'deps/v8_inspector/src/inspector/inspector.gyp:standalone_inspector',
@ -868,7 +870,8 @@
'dependencies': [ 'deps/gtest/gtest.gyp:gtest' ],
'include_dirs': [
'src',
'deps/v8/include'
'deps/v8/include',
'<(SHARED_INTERMEDIATE_DIR)'
],
'defines': [
# gtest's ASSERT macros conflict with our own.
@ -886,9 +889,21 @@
'conditions': [
['v8_inspector=="true"', {
'defines': [
'HAVE_INSPECTOR=1',
],
'dependencies': [
'deps/zlib/zlib.gyp:zlib',
'v8_inspector_compress_protocol_json#host'
],
'include_dirs': [
'<(SHARED_INTERMEDIATE_DIR)'
],
'sources': [
'src/inspector_socket.cc',
'test/cctest/test_inspector_socket.cc'
'src/inspector_socket_server.cc',
'test/cctest/test_inspector_socket.cc',
'test/cctest/test_inspector_socket_server.cc'
],
'conditions': [
[ 'node_shared_openssl=="false"', {

418
src/inspector_agent.cc

@ -1,6 +1,6 @@
#include "inspector_agent.h"
#include "inspector_socket.h"
#include "inspector_socket_server.h"
#include "env.h"
#include "env-inl.h"
#include "node.h"
@ -37,84 +37,6 @@ static const uint8_t PROTOCOL_JSON[] = {
#include "v8_inspector_protocol_json.h" // NOLINT(build/include_order)
};
std::string GetWsUrl(int port, const std::string& id) {
char buf[1024];
snprintf(buf, sizeof(buf), "127.0.0.1:%d/%s", port, id.c_str());
return buf;
}
void PrintDebuggerReadyMessage(int port, const std::string& id) {
fprintf(stderr, "Debugger listening on port %d.\n"
"Warning: This is an experimental feature and could change at any time.\n"
"To start debugging, open the following URL in Chrome:\n"
" chrome-devtools://devtools/bundled/inspector.html?"
"experiments=true&v8only=true&ws=%s\n",
port, GetWsUrl(port, id).c_str());
fflush(stderr);
}
std::string MapToString(const std::map<std::string, std::string> object) {
std::ostringstream json;
json << "[ {\n";
bool first = true;
for (const auto& name_value : object) {
if (!first)
json << ",\n";
json << " \"" << name_value.first << "\": \"";
json << name_value.second << "\"";
first = false;
}
json << "\n} ]\n\n";
return json.str();
}
void Escape(std::string* string) {
for (char& c : *string) {
c = (c == '\"' || c == '\\') ? '_' : c;
}
}
void DisposeInspector(InspectorSocket* socket, int status) {
delete socket;
}
void DisconnectAndDisposeIO(InspectorSocket* socket) {
if (socket) {
inspector_close(socket, DisposeInspector);
}
}
void OnBufferAlloc(uv_handle_t* handle, size_t len, uv_buf_t* buf) {
buf->base = new char[len];
buf->len = len;
}
void SendHttpResponse(InspectorSocket* socket, const char* response,
size_t size) {
const char HEADERS[] = "HTTP/1.0 200 OK\r\n"
"Content-Type: application/json; charset=UTF-8\r\n"
"Cache-Control: no-cache\r\n"
"Content-Length: %zu\r\n"
"\r\n";
char header[sizeof(HEADERS) + 20];
int header_len = snprintf(header, sizeof(header), HEADERS, size);
inspector_write(socket, header, header_len);
inspector_write(socket, response, size);
}
void SendHttpResponse(InspectorSocket* socket, const std::string& response) {
SendHttpResponse(socket, response.data(), response.size());
}
void SendVersionResponse(InspectorSocket* socket) {
static const char response[] =
"{\n"
" \"Browser\": \"node.js/" NODE_VERSION "\",\n"
" \"Protocol-Version\": \"1.1\"\n"
"}\n";
SendHttpResponse(socket, response, sizeof(response) - 1);
}
std::string GetProcessTitle() {
// uv_get_process_title will trim the title if it is too long.
char title[2048];
@ -126,36 +48,6 @@ std::string GetProcessTitle() {
}
}
void SendProtocolJson(InspectorSocket* socket) {
z_stream strm;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
CHECK_EQ(Z_OK, inflateInit(&strm));
static const size_t kDecompressedSize =
PROTOCOL_JSON[0] * 0x10000u +
PROTOCOL_JSON[1] * 0x100u +
PROTOCOL_JSON[2];
strm.next_in = const_cast<uint8_t*>(PROTOCOL_JSON + 3);
strm.avail_in = sizeof(PROTOCOL_JSON) - 3;
std::string data(kDecompressedSize, '\0');
strm.next_out = reinterpret_cast<Byte*>(&data[0]);
strm.avail_out = data.size();
CHECK_EQ(Z_STREAM_END, inflate(&strm, Z_FINISH));
CHECK_EQ(0, strm.avail_out);
CHECK_EQ(Z_OK, inflateEnd(&strm));
SendHttpResponse(socket, data);
}
const char* match_path_segment(const char* path, const char* expected) {
size_t len = strlen(expected);
if (StringEqualNoCaseN(path, expected, len)) {
if (path[len] == '/') return path + len + 1;
if (path[len] == '\0') return path + len;
}
return nullptr;
}
// UUID RFC: https://www.ietf.org/rfc/rfc4122.txt
// Used ver 4 - with numbers
std::string GenerateID() {
@ -201,17 +93,39 @@ std::string StringViewToUtf8(const StringView& view) {
return result;
}
std::unique_ptr<StringBuffer> Utf8ToStringView(const char* source,
size_t length) {
UnicodeString utf16 = UnicodeString::fromUTF8(StringPiece(source, length));
std::unique_ptr<StringBuffer> Utf8ToStringView(const std::string& message) {
UnicodeString utf16 =
UnicodeString::fromUTF8(StringPiece(message.data(), message.length()));
StringView view(reinterpret_cast<const uint16_t*>(utf16.getBuffer()),
utf16.length());
return StringBuffer::create(view);
}
} // namespace
class V8NodeInspector;
class InspectorAgentDelegate: public node::inspector::SocketServerDelegate {
public:
InspectorAgentDelegate(AgentImpl* agent, const std::string& script_path,
const std::string& script_name, bool wait);
bool StartSession(int session_id, const std::string& target_id) override;
void MessageReceived(int session_id, const std::string& message) override;
void EndSession(int session_id) override;
std::vector<std::string> GetTargetIds() override;
std::string GetTargetTitle(const std::string& id) override;
std::string GetTargetUrl(const std::string& id) override;
bool IsConnected() { return connected_; }
private:
AgentImpl* agent_;
bool connected_;
int session_id_;
const std::string script_name_;
const std::string script_path_;
const std::string target_id_;
bool waiting_;
};
class AgentImpl {
public:
explicit AgentImpl(node::Environment* env);
@ -223,42 +137,37 @@ class AgentImpl {
void Stop();
bool IsStarted();
bool IsConnected() { return state_ == State::kConnected; }
bool IsConnected();
void WaitForDisconnect();
void FatalException(v8::Local<v8::Value> error,
v8::Local<v8::Message> message);
void PostIncomingMessage(int session_id, const std::string& message);
void ResumeStartup() {
uv_sem_post(&start_sem_);
}
private:
using MessageQueue =
std::vector<std::pair<int, std::unique_ptr<StringBuffer>>>;
enum class State { kNew, kAccepting, kConnected, kDone, kError };
static void ThreadCbIO(void* agent);
static void OnSocketConnectionIO(uv_stream_t* server, int status);
static bool OnInspectorHandshakeIO(InspectorSocket* socket,
enum inspector_handshake_event state,
const std::string& path);
static void WriteCbIO(uv_async_t* async);
void InstallInspectorOnProcess();
void WorkerRunIO();
void OnInspectorConnectionIO(InspectorSocket* socket);
void OnRemoteDataIO(InspectorSocket* stream, ssize_t read,
const uv_buf_t* b);
void SetConnected(bool connected);
void DispatchMessages();
void Write(int session_id, const StringView& message);
bool AppendMessage(MessageQueue* vector, int session_id,
std::unique_ptr<StringBuffer> buffer);
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2);
void PostIncomingMessage(const char* message, size_t len);
void WaitForFrontendMessage();
void NotifyMessageReceived();
State ToState(State state);
void SendListResponse(InspectorSocket* socket);
bool RespondToGet(InspectorSocket* socket, const std::string& path);
uv_sem_t start_sem_;
ConditionVariable incoming_message_cond_;
@ -266,6 +175,8 @@ class AgentImpl {
uv_thread_t thread_;
uv_loop_t child_loop_;
InspectorAgentDelegate* delegate_;
int port_;
bool wait_;
bool shutting_down_;
@ -274,18 +185,15 @@ class AgentImpl {
uv_async_t* data_written_;
uv_async_t io_thread_req_;
InspectorSocket* client_socket_;
V8NodeInspector* inspector_;
v8::Platform* platform_;
MessageQueue incoming_message_queue_;
MessageQueue outgoing_message_queue_;
bool dispatching_messages_;
int frontend_session_id_;
int backend_session_id_;
int session_id_;
InspectorSocketServer* server_;
std::string script_name_;
std::string script_path_;
const std::string id_;
friend class ChannelImpl;
friend class DispatchOnInspectorBackendTask;
@ -300,11 +208,6 @@ void InterruptCallback(v8::Isolate*, void* agent) {
static_cast<AgentImpl*>(agent)->DispatchMessages();
}
void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) {
InspectorSocket* socket = inspector_from_stream(stream);
static_cast<AgentImpl*>(socket->data)->OnRemoteDataIO(socket, read, buf);
}
class DispatchOnInspectorBackendTask : public v8::Task {
public:
explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {}
@ -333,7 +236,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel {
void flushProtocolNotifications() override { }
void sendMessageToFrontend(const StringView& message) {
agent_->Write(agent_->frontend_session_id_, message);
agent_->Write(agent_->session_id_, message);
}
AgentImpl* const agent_;
@ -414,19 +317,18 @@ class V8NodeInspector : public v8_inspector::V8InspectorClient {
std::unique_ptr<v8_inspector::V8InspectorSession> session_;
};
AgentImpl::AgentImpl(Environment* env) : port_(0),
AgentImpl::AgentImpl(Environment* env) : delegate_(nullptr),
port_(0),
wait_(false),
shutting_down_(false),
state_(State::kNew),
parent_env_(env),
data_written_(new uv_async_t()),
client_socket_(nullptr),
inspector_(nullptr),
platform_(nullptr),
dispatching_messages_(false),
frontend_session_id_(0),
backend_session_id_(0),
id_(GenerateID()) {
session_id_(0),
server_(nullptr) {
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
memset(&io_thread_req_, 0, sizeof(io_thread_req_));
CHECK_EQ(0, uv_async_init(env->event_loop(), data_written_, nullptr));
@ -543,6 +445,10 @@ void AgentImpl::Stop() {
delete inspector_;
}
bool AgentImpl::IsConnected() {
return delegate_ != nullptr && delegate_->IsConnected();
}
bool AgentImpl::IsStarted() {
return !!platform_;
}
@ -550,6 +456,9 @@ bool AgentImpl::IsStarted() {
void AgentImpl::WaitForDisconnect() {
if (state_ == State::kConnected) {
shutting_down_ = true;
// Gives a signal to stop accepting new connections
// TODO(eugeneo): Introduce an API with explicit request names.
Write(0, StringView());
fprintf(stderr, "Waiting for the debugger to disconnect...\n");
fflush(stderr);
inspector_->runMessageLoopOnPause(0);
@ -621,181 +530,59 @@ void AgentImpl::ThreadCbIO(void* agent) {
static_cast<AgentImpl*>(agent)->WorkerRunIO();
}
// static
void AgentImpl::OnSocketConnectionIO(uv_stream_t* server, int status) {
if (status == 0) {
InspectorSocket* socket = new InspectorSocket();
socket->data = server->data;
if (inspector_accept(server, socket,
AgentImpl::OnInspectorHandshakeIO) != 0) {
delete socket;
}
}
}
// static
bool AgentImpl::OnInspectorHandshakeIO(InspectorSocket* socket,
enum inspector_handshake_event state,
const std::string& path) {
AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
switch (state) {
case kInspectorHandshakeHttpGet:
return agent->RespondToGet(socket, path);
case kInspectorHandshakeUpgrading:
return path.length() == agent->id_.length() + 1 &&
path.find(agent->id_) == 1;
case kInspectorHandshakeUpgraded:
agent->OnInspectorConnectionIO(socket);
return true;
case kInspectorHandshakeFailed:
delete socket;
return false;
default:
UNREACHABLE();
return false;
}
}
void AgentImpl::OnRemoteDataIO(InspectorSocket* socket,
ssize_t read,
const uv_buf_t* buf) {
if (read > 0) {
// TODO(pfeldman): Instead of blocking execution while debugger
// engages, node should wait for the run callback from the remote client
// and initiate its startup. This is a change to node.cc that should be
// upstreamed separately.
if (wait_) {
std::string message(buf->base, read);
if (message.find("\"Runtime.runIfWaitingForDebugger\"") !=
std::string::npos) {
wait_ = false;
uv_sem_post(&start_sem_);
}
}
PostIncomingMessage(buf->base, read);
} else {
// EOF
if (client_socket_ == socket) {
client_socket_ = nullptr;
PostIncomingMessage(TAG_DISCONNECT, sizeof(TAG_DISCONNECT) - 1);
}
DisconnectAndDisposeIO(socket);
}
if (buf) {
delete[] buf->base;
}
}
void AgentImpl::SendListResponse(InspectorSocket* socket) {
std::map<std::string, std::string> response;
response["description"] = "node.js instance";
response["faviconUrl"] = "https://nodejs.org/static/favicon.ico";
response["id"] = id_;
response["title"] = script_name_.empty() ? GetProcessTitle() : script_name_;
Escape(&response["title"]);
response["type"] = "node";
// This attribute value is a "best effort" URL that is passed as a JSON
// string. It is not guaranteed to resolve to a valid resource.
response["url"] = "file://" + script_path_;
Escape(&response["url"]);
if (!client_socket_) {
std::string address = GetWsUrl(port_, id_);
std::ostringstream frontend_url;
frontend_url << "chrome-devtools://devtools/bundled";
frontend_url << "/inspector.html?experiments=true&v8only=true&ws=";
frontend_url << address;
response["devtoolsFrontendUrl"] += frontend_url.str();
response["webSocketDebuggerUrl"] = "ws://" + address;
}
SendHttpResponse(socket, MapToString(response));
}
bool AgentImpl::RespondToGet(InspectorSocket* socket, const std::string& path) {
const char* command = match_path_segment(path.c_str(), "/json");
if (command == nullptr)
return false;
if (match_path_segment(command, "list") || command[0] == '\0') {
SendListResponse(socket);
return true;
} else if (match_path_segment(command, "protocol")) {
SendProtocolJson(socket);
return true;
} else if (match_path_segment(command, "version")) {
SendVersionResponse(socket);
return true;
} else if (const char* pid = match_path_segment(command, "activate")) {
if (pid != id_)
return false;
SendHttpResponse(socket, "Target activated");
return true;
}
return false;
}
// static
void AgentImpl::WriteCbIO(uv_async_t* async) {
AgentImpl* agent = static_cast<AgentImpl*>(async->data);
InspectorSocket* socket = agent->client_socket_;
if (socket) {
MessageQueue outgoing_messages;
agent->SwapBehindLock(&agent->outgoing_message_queue_, &outgoing_messages);
for (const MessageQueue::value_type& outgoing : outgoing_messages) {
if (outgoing.first == agent->frontend_session_id_) {
StringView message = outgoing.second->string();
std::string utf8Message = StringViewToUtf8(message);
inspector_write(socket, utf8Message.c_str(), utf8Message.length());
}
StringView view = outgoing.second->string();
if (view.length() == 0) {
agent->server_->Stop(nullptr);
} else {
agent->server_->Send(outgoing.first,
StringViewToUtf8(outgoing.second->string()));
}
}
}
void AgentImpl::WorkerRunIO() {
sockaddr_in addr;
uv_tcp_t server;
int err = uv_loop_init(&child_loop_);
CHECK_EQ(err, 0);
err = uv_async_init(&child_loop_, &io_thread_req_, AgentImpl::WriteCbIO);
CHECK_EQ(err, 0);
io_thread_req_.data = this;
std::string script_path;
if (!script_name_.empty()) {
uv_fs_t req;
if (0 == uv_fs_realpath(&child_loop_, &req, script_name_.c_str(), nullptr))
script_path_ = std::string(reinterpret_cast<char*>(req.ptr));
script_path = std::string(reinterpret_cast<char*>(req.ptr));
uv_fs_req_cleanup(&req);
}
uv_tcp_init(&child_loop_, &server);
uv_ip4_addr("0.0.0.0", port_, &addr);
server.data = this;
err = uv_tcp_bind(&server,
reinterpret_cast<const struct sockaddr*>(&addr), 0);
if (err == 0) {
err = uv_listen(reinterpret_cast<uv_stream_t*>(&server), 1,
OnSocketConnectionIO);
}
if (err != 0) {
InspectorAgentDelegate delegate(this, script_path, script_name_, wait_);
delegate_ = &delegate;
InspectorSocketServer server(&delegate, port_);
if (!server.Start(&child_loop_)) {
fprintf(stderr, "Unable to open devtools socket: %s\n", uv_strerror(err));
state_ = State::kError; // Safe, main thread is waiting on semaphore
uv_close(reinterpret_cast<uv_handle_t*>(&io_thread_req_), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&server), nullptr);
uv_loop_close(&child_loop_);
uv_sem_post(&start_sem_);
return;
}
PrintDebuggerReadyMessage(port_, id_);
server_ = &server;
if (!wait_) {
uv_sem_post(&start_sem_);
}
uv_run(&child_loop_, UV_RUN_DEFAULT);
uv_close(reinterpret_cast<uv_handle_t*>(&io_thread_req_), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&server), nullptr);
DisconnectAndDisposeIO(client_socket_);
server.Stop(nullptr);
server.TerminateConnections(nullptr);
uv_run(&child_loop_, UV_RUN_NOWAIT);
err = uv_loop_close(&child_loop_);
CHECK_EQ(err, 0);
delegate_ = nullptr;
server_ = nullptr;
}
bool AgentImpl::AppendMessage(MessageQueue* queue, int session_id,
@ -811,9 +598,10 @@ void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) {
vector1->swap(*vector2);
}
void AgentImpl::PostIncomingMessage(const char* message, size_t len) {
if (AppendMessage(&incoming_message_queue_, frontend_session_id_,
Utf8ToStringView(message, len))) {
void AgentImpl::PostIncomingMessage(int session_id,
const std::string& message) {
if (AppendMessage(&incoming_message_queue_, session_id,
Utf8ToStringView(message))) {
v8::Isolate* isolate = parent_env_->isolate();
platform_->CallOnForegroundThread(isolate,
new DispatchOnInspectorBackendTask(this));
@ -834,17 +622,6 @@ void AgentImpl::NotifyMessageReceived() {
incoming_message_cond_.Broadcast(scoped_lock);
}
void AgentImpl::OnInspectorConnectionIO(InspectorSocket* socket) {
if (client_socket_) {
DisconnectAndDisposeIO(socket);
return;
}
client_socket_ = socket;
inspector_read_start(socket, OnBufferAlloc, DataCallback);
frontend_session_id_++;
PostIncomingMessage(TAG_CONNECT, sizeof(TAG_CONNECT) - 1);
}
void AgentImpl::DispatchMessages() {
// This function can be reentered if there was an incoming message while
// V8 was processing another inspector request (e.g. if the user is
@ -867,7 +644,7 @@ void AgentImpl::DispatchMessages() {
if (tag == TAG_CONNECT) {
CHECK_EQ(State::kAccepting, state_);
backend_session_id_++;
session_id_ = pair.first;
state_ = State::kConnected;
fprintf(stderr, "Debugger attached.\n");
inspector_->connectFrontend();
@ -876,7 +653,6 @@ void AgentImpl::DispatchMessages() {
if (shutting_down_) {
state_ = State::kDone;
} else {
PrintDebuggerReadyMessage(port_, id_);
state_ = State::kAccepting;
}
inspector_->quitMessageLoopOnPause();
@ -930,6 +706,60 @@ void Agent::FatalException(v8::Local<v8::Value> error,
impl->FatalException(error, message);
}
InspectorAgentDelegate::InspectorAgentDelegate(AgentImpl* agent,
const std::string& script_path,
const std::string& script_name,
bool wait)
: agent_(agent),
connected_(false),
session_id_(0),
script_name_(script_name),
script_path_(script_path),
target_id_(GenerateID()),
waiting_(wait) { }
bool InspectorAgentDelegate::StartSession(int session_id,
const std::string& target_id) {
if (connected_)
return false;
connected_ = true;
agent_->PostIncomingMessage(session_id, TAG_CONNECT);
return true;
}
void InspectorAgentDelegate::MessageReceived(int session_id,
const std::string& message) {
// TODO(pfeldman): Instead of blocking execution while debugger
// engages, node should wait for the run callback from the remote client
// and initiate its startup. This is a change to node.cc that should be
// upstreamed separately.
if (waiting_) {
if (message.find("\"Runtime.runIfWaitingForDebugger\"") !=
std::string::npos) {
waiting_ = false;
agent_->ResumeStartup();
}
}
agent_->PostIncomingMessage(session_id, message);
}
void InspectorAgentDelegate::EndSession(int session_id) {
connected_ = false;
agent_->PostIncomingMessage(session_id, TAG_DISCONNECT);
}
std::vector<std::string> InspectorAgentDelegate::GetTargetIds() {
return { target_id_ };
}
std::string InspectorAgentDelegate::GetTargetTitle(const std::string& id) {
return script_name_.empty() ? GetProcessTitle() : script_name_;
}
std::string InspectorAgentDelegate::GetTargetUrl(const std::string& id) {
return "file://" + script_path_;
}
} // namespace inspector
} // namespace node

3
src/inspector_agent.h

@ -1,6 +1,8 @@
#ifndef SRC_INSPECTOR_AGENT_H_
#define SRC_INSPECTOR_AGENT_H_
#include <stddef.h>
#if !HAVE_INSPECTOR
#error("This header can only be used when inspector is enabled")
#endif
@ -36,7 +38,6 @@ class Agent {
bool IsStarted();
bool IsConnected();
void WaitForDisconnect();
void FatalException(v8::Local<v8::Value> error,
v8::Local<v8::Message> message);
private:

471
src/inspector_socket_server.cc

@ -0,0 +1,471 @@
#include "inspector_socket_server.h"
#include "node.h"
#include "uv.h"
#include "zlib.h"
#include <algorithm>
#include <map>
#include <set>
#include <sstream>
namespace node {
namespace inspector {
namespace {
static const uint8_t PROTOCOL_JSON[] = {
#include "v8_inspector_protocol_json.h" // NOLINT(build/include_order)
};
void Escape(std::string* string) {
for (char& c : *string) {
c = (c == '\"' || c == '\\') ? '_' : c;
}
}
std::string GetWsUrl(int port, const std::string& id) {
char buf[1024];
snprintf(buf, sizeof(buf), "127.0.0.1:%d/%s", port, id.c_str());
return buf;
}
std::string MapToString(const std::map<std::string, std::string>& object) {
bool first = true;
std::ostringstream json;
json << "{\n";
for (const auto& name_value : object) {
if (!first)
json << ",\n";
first = false;
json << " \"" << name_value.first << "\": \"";
json << name_value.second << "\"";
}
json << "\n} ";
return json.str();
}
std::string MapsToString(
const std::vector<std::map<std::string, std::string>>& array) {
bool first = true;
std::ostringstream json;
json << "[ ";
for (const auto& object : array) {
if (!first)
json << ", ";
first = false;
json << MapToString(object);
}
json << "]\n\n";
return json.str();
}
const char* MatchPathSegment(const char* path, const char* expected) {
size_t len = strlen(expected);
if (StringEqualNoCaseN(path, expected, len)) {
if (path[len] == '/') return path + len + 1;
if (path[len] == '\0') return path + len;
}
return nullptr;
}
void OnBufferAlloc(uv_handle_t* handle, size_t len, uv_buf_t* buf) {
buf->base = new char[len];
buf->len = len;
}
void PrintDebuggerReadyMessage(int port, const std::vector<std::string>& ids) {
fprintf(stderr,
"Debugger listening on port %d.\n"
"Warning: This is an experimental feature "
"and could change at any time.\n",
port);
if (ids.size() == 1)
fprintf(stderr, "To start debugging, open the following URL in Chrome:\n");
if (ids.size() > 1)
fprintf(stderr, "To start debugging, open the following URLs in Chrome:\n");
for (const std::string& id : ids) {
fprintf(stderr,
" chrome-devtools://devtools/bundled/inspector.html?"
"experiments=true&v8only=true&ws=%s\n", GetWsUrl(port, id).c_str());
}
fflush(stderr);
}
void SendHttpResponse(InspectorSocket* socket, const std::string& response) {
const char HEADERS[] = "HTTP/1.0 200 OK\r\n"
"Content-Type: application/json; charset=UTF-8\r\n"
"Cache-Control: no-cache\r\n"
"Content-Length: %zu\r\n"
"\r\n";
char header[sizeof(HEADERS) + 20];
int header_len = snprintf(header, sizeof(header), HEADERS, response.size());
inspector_write(socket, header, header_len);
inspector_write(socket, response.data(), response.size());
}
void SendVersionResponse(InspectorSocket* socket) {
std::map<std::string, std::string> response;
response["Browser"] = "node.js/" NODE_VERSION;
response["Protocol-Version"] = "1.1";
SendHttpResponse(socket, MapToString(response));
}
void SendProtocolJson(InspectorSocket* socket) {
z_stream strm;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
CHECK_EQ(Z_OK, inflateInit(&strm));
static const size_t kDecompressedSize =
PROTOCOL_JSON[0] * 0x10000u +
PROTOCOL_JSON[1] * 0x100u +
PROTOCOL_JSON[2];
strm.next_in = const_cast<uint8_t*>(PROTOCOL_JSON + 3);
strm.avail_in = sizeof(PROTOCOL_JSON) - 3;
std::string data(kDecompressedSize, '\0');
strm.next_out = reinterpret_cast<Byte*>(&data[0]);
strm.avail_out = data.size();
CHECK_EQ(Z_STREAM_END, inflate(&strm, Z_FINISH));
CHECK_EQ(0, strm.avail_out);
CHECK_EQ(Z_OK, inflateEnd(&strm));
SendHttpResponse(socket, data);
}
} // namespace
class Closer {
public:
explicit Closer(InspectorSocketServer* server) : server_(server),
close_count_(0) { }
void AddCallback(InspectorSocketServer::ServerCallback callback) {
if (callback == nullptr)
return;
callbacks_.insert(callback);
}
void DecreaseExpectedCount() {
--close_count_;
NotifyIfDone();
}
void IncreaseExpectedCount() {
++close_count_;
}
void NotifyIfDone() {
if (close_count_ == 0) {
for (auto callback : callbacks_) {
callback(server_);
}
InspectorSocketServer* server = server_;
delete server->closer_;
server->closer_ = nullptr;
}
}
private:
InspectorSocketServer* server_;
std::set<InspectorSocketServer::ServerCallback> callbacks_;
int close_count_;
};
class SocketSession {
public:
SocketSession(InspectorSocketServer* server, int id);
void Close(bool socket_cleanup, Closer* closer);
void Declined() { state_ = State::kDeclined; }
static SocketSession* From(InspectorSocket* socket) {
return node::ContainerOf(&SocketSession::socket_, socket);
}
void FrontendConnected();
InspectorSocketServer* GetServer() { return server_; }
int Id() { return id_; }
void Send(const std::string& message);
void SetTargetId(const std::string& target_id) {
CHECK(target_id_.empty());
target_id_ = target_id;
}
InspectorSocket* Socket() { return &socket_; }
const std::string TargetId() { return target_id_; }
private:
enum class State { kHttp, kWebSocket, kClosing, kEOF, kDeclined };
static void CloseCallback_(InspectorSocket* socket, int code);
static void ReadCallback_(uv_stream_t* stream, ssize_t read,
const uv_buf_t* buf);
void OnRemoteDataIO(InspectorSocket* socket, ssize_t read,
const uv_buf_t* buf);
const int id_;
Closer* closer_;
InspectorSocket socket_;
InspectorSocketServer* server_;
std::string target_id_;
State state_;
};
InspectorSocketServer::InspectorSocketServer(SocketServerDelegate* delegate,
int port) : loop_(nullptr),
delegate_(delegate),
port_(port),
closer_(nullptr),
next_session_id_(0) { }
// static
bool InspectorSocketServer::HandshakeCallback(InspectorSocket* socket,
inspector_handshake_event event,
const std::string& path) {
InspectorSocketServer* server = SocketSession::From(socket)->GetServer();
const std::string& id = path.empty() ? path : path.substr(1);
switch (event) {
case kInspectorHandshakeHttpGet:
return server->RespondToGet(socket, path);
case kInspectorHandshakeUpgrading:
return server->SessionStarted(SocketSession::From(socket), id);
case kInspectorHandshakeUpgraded:
SocketSession::From(socket)->FrontendConnected();
return true;
case kInspectorHandshakeFailed:
SocketSession::From(socket)->Close(false, nullptr);
return false;
default:
UNREACHABLE();
return false;
}
}
bool InspectorSocketServer::SessionStarted(SocketSession* session,
const std::string& id) {
bool connected = false;
if (TargetExists(id)) {
connected = delegate_->StartSession(session->Id(), id);
}
if (connected) {
connected_sessions_[session->Id()] = session;
session->SetTargetId(id);
} else {
session->Declined();
}
return connected;
}
void InspectorSocketServer::SessionTerminated(int session_id) {
if (connected_sessions_.erase(session_id) == 0) {
return;
}
delegate_->EndSession(session_id);
if (connected_sessions_.empty() &&
uv_is_active(reinterpret_cast<uv_handle_t*>(&server_))) {
PrintDebuggerReadyMessage(port_, delegate_->GetTargetIds());
}
}
bool InspectorSocketServer::RespondToGet(InspectorSocket* socket,
const std::string& path) {
const char* command = MatchPathSegment(path.c_str(), "/json");
if (command == nullptr)
return false;
if (MatchPathSegment(command, "list") || command[0] == '\0') {
SendListResponse(socket);
return true;
} else if (MatchPathSegment(command, "protocol")) {
SendProtocolJson(socket);
return true;
} else if (MatchPathSegment(command, "version")) {
SendVersionResponse(socket);
return true;
} else if (const char* target_id = MatchPathSegment(command, "activate")) {
if (TargetExists(target_id)) {
SendHttpResponse(socket, "Target activated");
return true;
}
return false;
}
return false;
}
void InspectorSocketServer::SendListResponse(InspectorSocket* socket) {
std::vector<std::map<std::string, std::string>> response;
for (const std::string& id : delegate_->GetTargetIds()) {
response.push_back(std::map<std::string, std::string>());
std::map<std::string, std::string>& target_map = response.back();
target_map["description"] = "node.js instance";
target_map["faviconUrl"] = "https://nodejs.org/static/favicon.ico";
target_map["id"] = id;
target_map["title"] = delegate_->GetTargetTitle(id);
Escape(&target_map["title"]);
target_map["type"] = "node";
// This attribute value is a "best effort" URL that is passed as a JSON
// string. It is not guaranteed to resolve to a valid resource.
target_map["url"] = delegate_->GetTargetUrl(id);
Escape(&target_map["url"]);
bool connected = false;
for (const auto& session : connected_sessions_) {
if (session.second->TargetId() == id) {
connected = true;
break;
}
}
if (!connected) {
std::string address = GetWsUrl(port_, id);
std::ostringstream frontend_url;
frontend_url << "chrome-devtools://devtools/bundled";
frontend_url << "/inspector.html?experiments=true&v8only=true&ws=";
frontend_url << address;
target_map["devtoolsFrontendUrl"] += frontend_url.str();
target_map["webSocketDebuggerUrl"] = "ws://" + address;
}
}
SendHttpResponse(socket, MapsToString(response));
}
bool InspectorSocketServer::Start(uv_loop_t* loop) {
loop_ = loop;
sockaddr_in addr;
uv_tcp_init(loop_, &server_);
uv_ip4_addr("0.0.0.0", port_, &addr);
int err = uv_tcp_bind(&server_,
reinterpret_cast<const struct sockaddr*>(&addr), 0);
if (err == 0) {
err = uv_listen(reinterpret_cast<uv_stream_t*>(&server_), 1,
SocketConnectedCallback);
}
if (err == 0 && connected_sessions_.empty()) {
PrintDebuggerReadyMessage(port_, delegate_->GetTargetIds());
}
if (err != 0 && connected_sessions_.empty()) {
fprintf(stderr, "Unable to open devtools socket: %s\n", uv_strerror(err));
uv_close(reinterpret_cast<uv_handle_t*>(&server_), nullptr);
return false;
}
return true;
}
void InspectorSocketServer::Stop(ServerCallback cb) {
if (closer_ == nullptr) {
closer_ = new Closer(this);
}
closer_->AddCallback(cb);
uv_handle_t* handle = reinterpret_cast<uv_handle_t*>(&server_);
if (uv_is_active(handle)) {
closer_->IncreaseExpectedCount();
uv_close(reinterpret_cast<uv_handle_t*>(&server_), ServerClosedCallback);
}
closer_->NotifyIfDone();
}
void InspectorSocketServer::TerminateConnections(ServerCallback cb) {
if (closer_ == nullptr) {
closer_ = new Closer(this);
}
closer_->AddCallback(cb);
std::map<int, SocketSession*> sessions;
std::swap(sessions, connected_sessions_);
for (const auto& session : sessions) {
int id = session.second->Id();
session.second->Close(true, closer_);
delegate_->EndSession(id);
}
closer_->NotifyIfDone();
}
bool InspectorSocketServer::TargetExists(const std::string& id) {
const std::vector<std::string>& target_ids = delegate_->GetTargetIds();
const auto& found = std::find(target_ids.begin(), target_ids.end(), id);
return found != target_ids.end();
}
void InspectorSocketServer::Send(int session_id, const std::string& message) {
auto session_iterator = connected_sessions_.find(session_id);
if (session_iterator != connected_sessions_.end()) {
session_iterator->second->Send(message);
}
}
// static
void InspectorSocketServer::ServerClosedCallback(uv_handle_t* server) {
InspectorSocketServer* socket_server = InspectorSocketServer::From(server);
if (socket_server->closer_)
socket_server->closer_->DecreaseExpectedCount();
}
// static
void InspectorSocketServer::SocketConnectedCallback(uv_stream_t* server,
int status) {
if (status == 0) {
InspectorSocketServer* socket_server = InspectorSocketServer::From(server);
SocketSession* session =
new SocketSession(socket_server, socket_server->next_session_id_++);
if (inspector_accept(server, session->Socket(), HandshakeCallback) != 0) {
delete session;
}
}
}
// InspectorSession tracking
SocketSession::SocketSession(InspectorSocketServer* server, int id)
: id_(id), closer_(nullptr), server_(server),
state_(State::kHttp) { }
void SocketSession::Close(bool socket_cleanup, Closer* closer) {
CHECK_EQ(closer_, nullptr);
CHECK_NE(state_, State::kClosing);
server_->SessionTerminated(id_);
if (socket_cleanup) {
state_ = State::kClosing;
closer_ = closer;
if (closer_ != nullptr)
closer->IncreaseExpectedCount();
inspector_close(&socket_, CloseCallback_);
} else {
delete this;
}
}
// static
void SocketSession::CloseCallback_(InspectorSocket* socket, int code) {
SocketSession* session = SocketSession::From(socket);
CHECK_EQ(State::kClosing, session->state_);
Closer* closer = session->closer_;
if (closer != nullptr)
closer->DecreaseExpectedCount();
delete session;
}
void SocketSession::FrontendConnected() {
CHECK_EQ(State::kHttp, state_);
state_ = State::kWebSocket;
inspector_read_start(&socket_, OnBufferAlloc, ReadCallback_);
}
// static
void SocketSession::ReadCallback_(uv_stream_t* stream, ssize_t read,
const uv_buf_t* buf) {
InspectorSocket* socket = inspector_from_stream(stream);
SocketSession::From(socket)->OnRemoteDataIO(socket, read, buf);
}
void SocketSession::OnRemoteDataIO(InspectorSocket* socket, ssize_t read,
const uv_buf_t* buf) {
if (read > 0) {
server_->Delegate()->MessageReceived(id_, std::string(buf->base, read));
} else {
server_->SessionTerminated(id_);
Close(true, nullptr);
}
if (buf != nullptr && buf->base != nullptr)
delete[] buf->base;
}
void SocketSession::Send(const std::string& message) {
inspector_write(&socket_, message.data(), message.length());
}
} // namespace inspector
} // namespace node

77
src/inspector_socket_server.h

@ -0,0 +1,77 @@
#ifndef SRC_INSPECTOR_SOCKET_SERVER_H_
#define SRC_INSPECTOR_SOCKET_SERVER_H_
#include "inspector_agent.h"
#include "inspector_socket.h"
#include "uv.h"
#include <map>
#include <string>
#include <vector>
#if !HAVE_INSPECTOR
#error("This header can only be used when inspector is enabled")
#endif
namespace node {
namespace inspector {
class Closer;
class SocketSession;
class SocketServerDelegate {
public:
virtual bool StartSession(int session_id, const std::string& target_id) = 0;
virtual void EndSession(int session_id) = 0;
virtual void MessageReceived(int session_id, const std::string& message) = 0;
virtual std::vector<std::string> GetTargetIds() = 0;
virtual std::string GetTargetTitle(const std::string& id) = 0;
virtual std::string GetTargetUrl(const std::string& id) = 0;
};
class InspectorSocketServer {
public:
using ServerCallback = void (*)(InspectorSocketServer*);
InspectorSocketServer(SocketServerDelegate* delegate, int port);
bool Start(uv_loop_t* loop);
void Stop(ServerCallback callback);
void Send(int session_id, const std::string& message);
void TerminateConnections(ServerCallback callback);
private:
static bool HandshakeCallback(InspectorSocket* socket,
enum inspector_handshake_event state,
const std::string& path);
static void SocketConnectedCallback(uv_stream_t* server, int status);
static void ServerClosedCallback(uv_handle_t* server);
template<typename SomeUvStruct>
static InspectorSocketServer* From(SomeUvStruct* server) {
return node::ContainerOf(&InspectorSocketServer::server_,
reinterpret_cast<uv_tcp_t*>(server));
}
bool RespondToGet(InspectorSocket* socket, const std::string& path);
void SendListResponse(InspectorSocket* socket);
void ReadCallback(InspectorSocket* socket, ssize_t read, const uv_buf_t* buf);
bool SessionStarted(SocketSession* session, const std::string& id);
void SessionTerminated(int id);
bool TargetExists(const std::string& id);
static void SocketSessionDeleter(SocketSession*);
SocketServerDelegate* Delegate() { return delegate_; }
uv_loop_t* loop_;
SocketServerDelegate* const delegate_;
const int port_;
std::string path_;
uv_tcp_t server_;
Closer* closer_;
std::map<int, SocketSession*> connected_sessions_;
int next_session_id_;
friend class SocketSession;
friend class Closer;
};
} // namespace inspector
} // namespace node
#endif // SRC_INSPECTOR_SOCKET_SERVER_H_

517
test/cctest/test_inspector_socket_server.cc

@ -0,0 +1,517 @@
#include "inspector_socket_server.h"
#include "node.h"
#include "gtest/gtest.h"
#include <algorithm>
#include <sstream>
static const int PORT = 9229;
static uv_loop_t loop;
static const char CLIENT_CLOSE_FRAME[] = "\x88\x80\x2D\x0E\x1E\xFA";
static const char SERVER_CLOSE_FRAME[] = "\x88\x00";
static const char MAIN_TARGET_ID[] = "main-target";
static const char UNCONNECTABLE_TARGET_ID[] = "unconnectable-target";
static const char WS_HANDSHAKE_RESPONSE[] =
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: Dt87H1OULVZnSJo/KgMUYI7xPCg=\r\n\r\n";
#define SPIN_WHILE(condition) \
{ \
Timeout timeout(&loop); \
while ((condition) && !timeout.timed_out) { \
uv_run(&loop, UV_RUN_NOWAIT); \
} \
ASSERT_FALSE((condition)); \
}
namespace {
using InspectorSocketServer = node::inspector::InspectorSocketServer;
using SocketServerDelegate = node::inspector::SocketServerDelegate;
class Timeout {
public:
explicit Timeout(uv_loop_t* loop) : timed_out(false), done_(false) {
uv_timer_init(loop, &timer_);
uv_timer_start(&timer_, Timeout::set_flag, 5000, 0);
}
~Timeout() {
uv_timer_stop(&timer_);
uv_close(reinterpret_cast<uv_handle_t*>(&timer_), mark_done);
while (!done_) {
uv_run(&loop, UV_RUN_NOWAIT);
}
}
bool timed_out;
private:
static void set_flag(uv_timer_t* timer) {
Timeout* t = node::ContainerOf(&Timeout::timer_, timer);
t->timed_out = true;
}
static void mark_done(uv_handle_t* timer) {
Timeout* t = node::ContainerOf(&Timeout::timer_,
reinterpret_cast<uv_timer_t*>(timer));
t->done_ = true;
}
bool done_;
uv_timer_t timer_;
};
class InspectorSocketServerTest : public ::testing::Test {
protected:
void SetUp() override {
uv_loop_init(&loop);
}
void TearDown() override {
const int err = uv_loop_close(&loop);
if (err != 0) {
uv_print_all_handles(&loop, stderr);
}
EXPECT_EQ(0, err);
}
};
class TestInspectorServerDelegate : public SocketServerDelegate {
public:
TestInspectorServerDelegate() : connected(0), disconnected(0),
targets_({ MAIN_TARGET_ID,
UNCONNECTABLE_TARGET_ID }) {}
void Connect(InspectorSocketServer* server) {
server_ = server;
}
bool StartSession(int session_id, const std::string& target_id) override {
buffer_.clear();
CHECK_NE(targets_.end(),
std::find(targets_.begin(), targets_.end(), target_id));
if (target_id == UNCONNECTABLE_TARGET_ID) {
return false;
}
connected++;
session_id_ = session_id;
return true;
}
void MessageReceived(int session_id, const std::string& message) override {
ASSERT_EQ(session_id_, session_id);
buffer_.insert(buffer_.end(), message.begin(), message.end());
}
void EndSession(int session_id) override {
ASSERT_EQ(session_id_, session_id);
disconnected++;
}
std::vector<std::string> GetTargetIds() override {
return targets_;
}
std::string GetTargetTitle(const std::string& id) override {
return id + " Target Title";
}
std::string GetTargetUrl(const std::string& id) override {
return "file://" + id + "/script.js";
}
void Expect(const std::string& expects) {
SPIN_WHILE(buffer_.size() < expects.length());
ASSERT_STREQ(std::string(buffer_.data(), expects.length()).c_str(),
expects.c_str());
buffer_.erase(buffer_.begin(), buffer_.begin() + expects.length());
}
void Write(const std::string& message) {
server_->Send(session_id_, message);
}
int connected;
int disconnected;
private:
const std::vector<std::string> targets_;
InspectorSocketServer* server_;
int session_id_;
std::vector<char> buffer_;
};
class SocketWrapper {
public:
explicit SocketWrapper(uv_loop_t* loop) : closed_(false),
eof_(false),
loop_(loop),
connected_(false),
sending_(false) { }
void Connect(std::string host, int port) {
closed_ = false;
connection_failed_ = false;
connected_ = false;
eof_ = false;
contents_.clear();
uv_tcp_init(loop_, &socket_);
sockaddr_in addr;
uv_ip4_addr(host.c_str(), PORT, &addr);
int err = uv_tcp_connect(&connect_, &socket_,
reinterpret_cast<const sockaddr*>(&addr),
Connected_);
ASSERT_EQ(0, err);
SPIN_WHILE(!connected_)
uv_read_start(reinterpret_cast<uv_stream_t*>(&socket_), AllocCallback,
ReadCallback);
}
void ExpectFailureToConnect(std::string host, int port) {
connected_ = false;
connection_failed_ = false;
closed_ = false;
eof_ = false;
contents_.clear();
uv_tcp_init(loop_, &socket_);
sockaddr_in addr;
uv_ip4_addr(host.c_str(), PORT, &addr);
int err = uv_tcp_connect(&connect_, &socket_,
reinterpret_cast<const sockaddr*>(&addr),
ConnectionMustFail_);
ASSERT_EQ(0, err);
SPIN_WHILE(!connection_failed_)
uv_read_start(reinterpret_cast<uv_stream_t*>(&socket_), AllocCallback,
ReadCallback);
}
void Close() {
uv_close(reinterpret_cast<uv_handle_t*>(&socket_), ClosedCallback);
SPIN_WHILE(!closed_);
}
void Expect(const std::string& expects) {
SPIN_WHILE(contents_.size() < expects.length());
ASSERT_STREQ(expects.c_str(),
std::string(contents_.data(), expects.length()).c_str());
contents_.erase(contents_.begin(), contents_.begin() + expects.length());
}
void ExpectEOF() {
SPIN_WHILE(!eof_);
Close();
}
void TestHttpRequest(const std::string& path,
const std::string& expected_reply) {
std::ostringstream expectations;
expectations << "HTTP/1.0 200 OK\r\n"
"Content-Type: application/json; charset=UTF-8\r\n"
"Cache-Control: no-cache\r\n"
"Content-Length: ";
expectations << expected_reply.length() + 2;
expectations << "\r\n\r\n" << expected_reply << "\n\n";
Write("GET " + path + " HTTP/1.1\r\n"
"Host: localhost:9229\r\n\r\n");
Expect(expectations.str());
}
void Write(const std::string& data) {
ASSERT_FALSE(sending_);
uv_buf_t buf[1];
buf[0].base = const_cast<char*>(data.data());
buf[0].len = data.length();
sending_ = true;
int err = uv_write(&write_, reinterpret_cast<uv_stream_t*>(&socket_),
buf, 1, WriteDone_);
ASSERT_EQ(err, 0);
SPIN_WHILE(sending_);
}
private:
static void AllocCallback(uv_handle_t*, size_t size, uv_buf_t* buf) {
*buf = uv_buf_init(new char[size], size);
}
static void ClosedCallback(uv_handle_t* handle) {
SocketWrapper* wrapper =
node::ContainerOf(&SocketWrapper::socket_,
reinterpret_cast<uv_tcp_t*>(handle));
ASSERT_FALSE(wrapper->closed_);
wrapper->closed_ = true;
}
static void Connected_(uv_connect_t* connect, int status) {
EXPECT_EQ(0, status);
SocketWrapper* wrapper =
node::ContainerOf(&SocketWrapper::connect_, connect);
wrapper->connected_ = true;
}
static void ConnectionMustFail_(uv_connect_t* connect, int status) {
EXPECT_EQ(UV_ECONNREFUSED, status);
SocketWrapper* wrapper =
node::ContainerOf(&SocketWrapper::connect_, connect);
wrapper->connection_failed_ = true;
}
static void ReadCallback(uv_stream_t* stream, ssize_t read,
const uv_buf_t* buf) {
SocketWrapper* wrapper =
node::ContainerOf(&SocketWrapper::socket_,
reinterpret_cast<uv_tcp_t*>(stream));
if (read == UV_EOF) {
wrapper->eof_ = true;
} else {
wrapper->contents_.insert(wrapper->contents_.end(), buf->base,
buf->base + read);
}
delete[] buf->base;
}
static void WriteDone_(uv_write_t* req, int err) {
ASSERT_EQ(0, err);
SocketWrapper* wrapper =
node::ContainerOf(&SocketWrapper::write_, req);
ASSERT_TRUE(wrapper->sending_);
wrapper->sending_ = false;
}
bool IsConnected() { return connected_; }
bool closed_;
bool eof_;
uv_loop_t* loop_;
uv_tcp_t socket_;
uv_connect_t connect_;
uv_write_t write_;
bool connected_;
bool connection_failed_;
bool sending_;
std::vector<char> contents_;
};
class ServerHolder {
public:
template <typename Delegate>
ServerHolder(Delegate* delegate, int port)
: closed(false), paused(false), sessions_terminated(false),
server_(delegate, port) {
delegate->Connect(&server_);
}
InspectorSocketServer* operator->() {
return &server_;
}
static void CloseCallback(InspectorSocketServer* server) {
ServerHolder* holder = node::ContainerOf(&ServerHolder::server_, server);
holder->closed = true;
}
static void ConnectionsTerminated(InspectorSocketServer* server) {
ServerHolder* holder = node::ContainerOf(&ServerHolder::server_, server);
holder->sessions_terminated = true;
}
static void PausedCallback(InspectorSocketServer* server) {
ServerHolder* holder = node::ContainerOf(&ServerHolder::server_, server);
holder->paused = true;
}
bool closed;
bool paused;
bool sessions_terminated;
private:
InspectorSocketServer server_;
};
class ServerDelegateNoTargets : public SocketServerDelegate {
public:
void Connect(InspectorSocketServer* server) { }
void MessageReceived(int session_id, const std::string& message) override { }
void EndSession(int session_id) override { }
bool StartSession(int session_id, const std::string& target_id) override {
return false;
}
std::vector<std::string> GetTargetIds() override {
return std::vector<std::string>();
}
std::string GetTargetTitle(const std::string& id) override {
return "";
}
std::string GetTargetUrl(const std::string& id) override {
return "";
}
};
static void TestHttpRequest(int port, const std::string& path,
const std::string& expected_body) {
SocketWrapper socket(&loop);
socket.Connect("0.0.0.0", port);
socket.TestHttpRequest(path, expected_body);
socket.Close();
}
static const std::string WsHandshakeRequest(const std::string& target_id) {
return "GET /" + target_id + " HTTP/1.1\r\n"
"Host: localhost:9229\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: aaa==\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n";
}
} // namespace
TEST_F(InspectorSocketServerTest, InspectorSessions) {
TestInspectorServerDelegate delegate;
ServerHolder server(&delegate, PORT);
ASSERT_TRUE(server->Start(&loop));
SocketWrapper well_behaved_socket(&loop);
// Regular connection
well_behaved_socket.Connect("0.0.0.0", PORT);
well_behaved_socket.Write(WsHandshakeRequest(MAIN_TARGET_ID));
well_behaved_socket.Expect(WS_HANDSHAKE_RESPONSE);
EXPECT_EQ(1, delegate.connected);
well_behaved_socket.Write("\x81\x84\x7F\xC2\x66\x31\x4E\xF0\x55\x05");
delegate.Expect("1234");
delegate.Write("5678");
well_behaved_socket.Expect("\x81\x4" "5678");
well_behaved_socket.Write(CLIENT_CLOSE_FRAME);
well_behaved_socket.Expect(SERVER_CLOSE_FRAME);
EXPECT_EQ(1, delegate.disconnected);
well_behaved_socket.Close();
// Declined connection
SocketWrapper declined_target_socket(&loop);
declined_target_socket.Connect("127.0.0.1", PORT);
declined_target_socket.Write(WsHandshakeRequest(UNCONNECTABLE_TARGET_ID));
declined_target_socket.Expect("HTTP/1.0 400 Bad Request");
declined_target_socket.ExpectEOF();
EXPECT_EQ(1, delegate.connected);
EXPECT_EQ(1, delegate.disconnected);
// Bogus target - start session callback should not even be invoked
SocketWrapper bogus_target_socket(&loop);
bogus_target_socket.Connect("127.0.0.1", PORT);
bogus_target_socket.Write(WsHandshakeRequest("bogus_target"));
bogus_target_socket.Expect("HTTP/1.0 400 Bad Request");
bogus_target_socket.ExpectEOF();
EXPECT_EQ(1, delegate.connected);
EXPECT_EQ(1, delegate.disconnected);
// Drop connection (no proper close frames)
SocketWrapper dropped_connection_socket(&loop);
dropped_connection_socket.Connect("127.0.0.1", PORT);
dropped_connection_socket.Write(WsHandshakeRequest(MAIN_TARGET_ID));
dropped_connection_socket.Expect(WS_HANDSHAKE_RESPONSE);
EXPECT_EQ(2, delegate.connected);
delegate.Write("5678");
dropped_connection_socket.Expect("\x81\x4" "5678");
dropped_connection_socket.Close();
SPIN_WHILE(delegate.disconnected < 2);
// Reconnect regular connection
SocketWrapper stays_till_termination_socket(&loop);
stays_till_termination_socket.Connect("127.0.0.1", PORT);
stays_till_termination_socket.Write(WsHandshakeRequest(MAIN_TARGET_ID));
stays_till_termination_socket.Expect(WS_HANDSHAKE_RESPONSE);
EXPECT_EQ(3, delegate.connected);
delegate.Write("5678");
stays_till_termination_socket.Expect("\x81\x4" "5678");
stays_till_termination_socket
.Write("\x81\x84\x7F\xC2\x66\x31\x4E\xF0\x55\x05");
delegate.Expect("1234");
server->Stop(ServerHolder::CloseCallback);
server->TerminateConnections(ServerHolder::ConnectionsTerminated);
stays_till_termination_socket.Write(CLIENT_CLOSE_FRAME);
stays_till_termination_socket.Expect(SERVER_CLOSE_FRAME);
EXPECT_EQ(3, delegate.disconnected);
SPIN_WHILE(!server.closed);
stays_till_termination_socket.ExpectEOF();
}
TEST_F(InspectorSocketServerTest, ServerDoesNothing) {
TestInspectorServerDelegate delegate;
ServerHolder server(&delegate, PORT);
ASSERT_TRUE(server->Start(&loop));
server->Stop(ServerHolder::CloseCallback);
server->TerminateConnections(ServerHolder::ConnectionsTerminated);
SPIN_WHILE(!server.closed);
}
TEST_F(InspectorSocketServerTest, ServerWithoutTargets) {
ServerDelegateNoTargets delegate;
ServerHolder server(&delegate, PORT);
ASSERT_TRUE(server->Start(&loop));
TestHttpRequest(PORT, "/json/list", "[ ]");
TestHttpRequest(PORT, "/json", "[ ]");
// Declined connection
SocketWrapper socket(&loop);
socket.Connect("0.0.0.0", PORT);
socket.Write(WsHandshakeRequest(UNCONNECTABLE_TARGET_ID));
socket.Expect("HTTP/1.0 400 Bad Request");
socket.ExpectEOF();
server->Stop(ServerHolder::CloseCallback);
server->TerminateConnections(ServerHolder::ConnectionsTerminated);
SPIN_WHILE(!server.closed);
}
TEST_F(InspectorSocketServerTest, ServerCannotStart) {
ServerDelegateNoTargets delegate1, delegate2;
ServerHolder server1(&delegate1, PORT);
ASSERT_TRUE(server1->Start(&loop));
ServerHolder server2(&delegate2, PORT);
ASSERT_FALSE(server2->Start(&loop));
server1->Stop(ServerHolder::CloseCallback);
server1->TerminateConnections(ServerHolder::ConnectionsTerminated);
server2->Stop(ServerHolder::CloseCallback);
server2->TerminateConnections(ServerHolder::ConnectionsTerminated);
SPIN_WHILE(!server1.closed);
SPIN_WHILE(!server2.closed);
}
TEST_F(InspectorSocketServerTest, StoppingServerDoesNotKillConnections) {
ServerDelegateNoTargets delegate;
ServerHolder server(&delegate, PORT);
ASSERT_TRUE(server->Start(&loop));
SocketWrapper socket1(&loop);
socket1.Connect("0.0.0.0", PORT);
socket1.TestHttpRequest("/json/list", "[ ]");
server->Stop(ServerHolder::CloseCallback);
SPIN_WHILE(!server.closed);
socket1.TestHttpRequest("/json/list", "[ ]");
socket1.Close();
uv_run(&loop, UV_RUN_DEFAULT);
}
Loading…
Cancel
Save