diff --git a/eth/main.cpp b/eth/main.cpp index 5051580a5..8014cc0fa 100644 --- a/eth/main.cpp +++ b/eth/main.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -97,38 +98,40 @@ void interactiveHelp() void help() { cout - << "Usage eth [OPTIONS] " << endl - << "Options:" << endl - << " -a,--address Set the coinbase (mining payout) address to addr (default: auto)." << endl - << " -b,--bootstrap Connect to the default Ethereum peerserver." << endl - << " -c,--client-name Add a name to your client's version string (default: blank)." << endl - << " -d,--db-path Load database from path (default: ~/.ethereum " << endl - << " /Etherum or Library/Application Support/Ethereum)." << endl - << " -f,--force-mining Mine even when there are no transaction to mine (Default: off)" << endl - << " -h,--help Show this help message and exit." << endl - << " -i,--interactive Enter interactive mode (default: non-interactive)." << endl + << "Usage eth [OPTIONS] " << endl + << "Options:" << endl + << " -a,--address Set the coinbase (mining payout) address to addr (default: auto)." << endl + << " -b,--bootstrap Connect to the default Ethereum peerserver." << endl + << " -c,--client-name Add a name to your client's version string (default: blank)." << endl + << " -d,--db-path Load database from path (default: ~/.ethereum " << endl + << " /Etherum or Library/Application Support/Ethereum)." << endl + << " -f,--force-mining Mine even when there are no transaction to mine (Default: off)" << endl + << " -h,--help Show this help message and exit." << endl + << " -i,--interactive Enter interactive mode (default: non-interactive)." << endl #if ETH_JSONRPC - << " -j,--json-rpc Enable JSON-RPC server (default: off)." << endl - << " --json-rpc-port Specify JSON-RPC server port (implies '-j', default: 8080)." << endl + << " -j,--json-rpc Enable JSON-RPC server (default: off)." << endl + << " --json-rpc-port Specify JSON-RPC server port (implies '-j', default: 8080)." << endl #endif - << " -l,--listen Listen on the given port for incoming connected (default: 30303)." << endl - << " -m,--mining Enable mining, optionally for a specified number of blocks (Default: off)" << endl - << " -n,--upnp Use upnp for NAT (default: on)." << endl - << " -L,--local-networking Use peers whose addresses are local." << endl - << " -o,--mode Start a full node or a peer node (Default: full)." << endl - << " -p,--port Connect to remote port (default: 30303)." << endl - << " -r,--remote Connect to remote host (default: none)." << endl - << " -s,--secret Set the secret key for use with send command (default: auto)." << endl - << " -t,--miners Number of mining threads to start (Default: " << thread::hardware_concurrency() << ")" << endl - << " -u,--public-ip Force public ip to given (default; auto)." << endl - << " -v,--verbosity <0 - 9> Set the log verbosity from 0 to 9 (Default: 8)." << endl - << " -x,--peers Attempt to connect to given number of peers (Default: 5)." << endl - << " -V,--version Show the version and exit." << endl + << " -l,--listen Listen on the given port for incoming connected (default: 30303)." << endl + << " -m,--mining Enable mining, optionally for a specified number of blocks (Default: off)" << endl + << " -n,--upnp Use upnp for NAT (default: on)." << endl + << " -L,--local-networking Use peers whose addresses are local." << endl + << " -o,--mode Start a full node or a peer node (Default: full)." << endl + << " -p,--port Connect to remote port (default: 30303)." << endl + << " -r,--remote Connect to remote host (default: none)." << endl + << " -s,--secret Set the secret key for use with send command (default: auto)." << endl + << " --structured-logging Enables structured logging." << endl + << " --structured-logging-format Give time format string for structured logging output." << endl + << " -t,--miners Number of mining threads to start (Default: " << thread::hardware_concurrency() << ")" << endl + << " -u,--public-ip Force public ip to given (default; auto)." << endl + << " -v,--verbosity <0 - 9> Set the log verbosity from 0 to 9 (Default: 8)." << endl + << " -x,--peers Attempt to connect to given number of peers (Default: 5)." << endl + << " -V,--version Show the version and exit." << endl #if ETH_EVMJIT - << " --jit Use EVM JIT (default: off)." << endl + << " --jit Use EVM JIT (default: off)." << endl #endif ; - exit(0); + exit(0); } string credits(bool _interactive = false) @@ -207,6 +210,8 @@ int main(int argc, char** argv) bool useLocal = false; bool forceMining = false; bool jit = false; + bool structuredLogging = false; + string structuredLoggingFormat = "%Y-%m-%dT%H:%M:%S"; string clientName; // Init defaults @@ -279,6 +284,10 @@ int main(int argc, char** argv) } else if ((arg == "-s" || arg == "--secret") && i + 1 < argc) us = KeyPair(h256(fromHex(argv[++i]))); + else if (arg == "--structured-logging-format" && i + 1 < argc) + structuredLoggingFormat = string(argv[++i]); + else if (arg == "--structured-logging") + structuredLogging = true; else if ((arg == "-d" || arg == "--path" || arg == "--db-path") && i + 1 < argc) dbPath = argv[++i]; else if ((arg == "-m" || arg == "--mining") && i + 1 < argc) @@ -350,11 +359,13 @@ int main(int argc, char** argv) cout << credits(); + StructuredLogger::get().initialize(structuredLogging, structuredLoggingFormat); VMFactory::setKind(jit ? VMKind::JIT : VMKind::Interpreter); NetworkPreferences netPrefs(listenPort, publicIP, upnp, useLocal); auto nodesState = contents((dbPath.size() ? dbPath : getDataDir()) + "/network.rlp"); + std::string clientImplString = "Ethereum(++)/" + clientName + "v" + dev::Version + "/" DEV_QUOTED(ETH_BUILD_TYPE) "/" DEV_QUOTED(ETH_BUILD_PLATFORM) + (jit ? "/JIT" : ""); dev::WebThreeDirect web3( - "Ethereum(++)/" + clientName + "v" + dev::Version + "/" DEV_QUOTED(ETH_BUILD_TYPE) "/" DEV_QUOTED(ETH_BUILD_PLATFORM) + (jit ? "/JIT" : ""), + clientImplString, dbPath, false, mode == NodeMode::Full ? set{"eth", "shh"} : set(), @@ -364,7 +375,7 @@ int main(int argc, char** argv) ); web3.setIdealPeerCount(peers); eth::Client* c = mode == NodeMode::Full ? web3.ethereum() : nullptr; - + StructuredLogger::starting(clientImplString, dev::Version); if (c) { c->setForceMining(forceMining); @@ -525,7 +536,7 @@ int main(int argc, char** argv) string sdata; iss >> hexAddr >> amount >> gasPrice >> gas >> sechex >> sdata; - + cnote << "Data:"; cnote << sdata; bytes data = dev::eth::parseData(sdata); @@ -608,7 +619,7 @@ int main(int argc, char** argv) if (size > 0) cwarn << "Invalid address length:" << size; } - else + else { auto const& bc =c->blockChain(); auto h = bc.currentHash(); @@ -630,7 +641,7 @@ int main(int argc, char** argv) cwarn << "transaction rejected"; } } - } + } else cwarn << "Require parameters: send ADDRESS AMOUNT"; } @@ -688,7 +699,7 @@ int main(int argc, char** argv) cwarn << "Minimum gas amount is" << minGas; else c->transact(us.secret(), endowment, init, gas, gasPrice); - } + } else cwarn << "Require parameters: contract ENDOWMENT GASPRICE GAS CODEHEX"; } @@ -806,7 +817,7 @@ int main(int argc, char** argv) string hexSec; iss >> hexSec; us = KeyPair(h256(fromHex(hexSec))); - } + } else cwarn << "Require parameter: setSecret HEXSECRETKEY"; } @@ -847,7 +858,7 @@ int main(int argc, char** argv) RLPStream config(2); config << us.secret() << coinbase; writeFile(path, config.out()); - } + } else cwarn << "Require parameter: exportConfig PATH"; } @@ -863,10 +874,10 @@ int main(int argc, char** argv) RLP config(b); us = KeyPair(config[0].toHash()); coinbase = config[1].toHash
(); - } + } else cwarn << path << "has no content!"; - } + } else cwarn << "Require parameter: importConfig PATH"; } @@ -898,6 +909,7 @@ int main(int argc, char** argv) while (!g_exit) this_thread::sleep_for(chrono::milliseconds(1000)); + StructuredLogger::stopping(clientImplString, dev::Version); auto netData = web3.saveNetwork(); if (!netData.empty()) writeFile((dbPath.size() ? dbPath : getDataDir()) + "/network.rlp", netData); diff --git a/libdevcore/CMakeLists.txt b/libdevcore/CMakeLists.txt index e3851b7ce..a527aa7b0 100644 --- a/libdevcore/CMakeLists.txt +++ b/libdevcore/CMakeLists.txt @@ -12,6 +12,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DSTATICLIB") aux_source_directory(. SRC_LIST) +include_directories(BEFORE ${JSONCPP_INCLUDE_DIRS}) include_directories(BEFORE ..) include_directories(${Boost_INCLUDE_DIRS}) @@ -27,6 +28,7 @@ endif() target_link_libraries(${EXECUTABLE} ${Boost_THREAD_LIBRARIES}) target_link_libraries(${EXECUTABLE} ${Boost_SYSTEM_LIBRARIES}) +target_link_libraries(${EXECUTABLE} ${JSONCPP_LIBRARIES}) # transitive dependencies for windows executables if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC") diff --git a/libdevcore/StructuredLogger.cpp b/libdevcore/StructuredLogger.cpp new file mode 100644 index 000000000..d93a9496e --- /dev/null +++ b/libdevcore/StructuredLogger.cpp @@ -0,0 +1,194 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file StructuredLogger.h + * @author Lefteris Karapetsas + * @date 2015 + * + * A simple helper class for the structured logging + */ + +#include "StructuredLogger.h" + +#include +#include +#include "Guards.h" + +using namespace std; + +namespace dev +{ + +string StructuredLogger::timePointToString(chrono::system_clock::time_point const& _ts) +{ + // not using C++11 std::put_time due to gcc bug + // http://stackoverflow.com/questions/14136833/stdput-time-implementation-status-in-gcc + + char buffer[64]; + time_t time = chrono::system_clock::to_time_t(_ts); + tm* ptm = localtime(&time); + if (strftime(buffer, sizeof(buffer), get().m_timeFormat.c_str(), ptm)) + return string(buffer); + return ""; +} + +void StructuredLogger::outputJson(Json::Value const& _value, std::string const& _name) const +{ + Json::Value event; + static Mutex s_lock; + Guard l(s_lock); + event[_name] = _value; + cout << event << endl << flush; +} + +void StructuredLogger::starting(string const& _clientImpl, const char* _ethVersion) +{ + if (get().m_enabled) + { + Json::Value event; + event["client_impl"] = _clientImpl; + event["eth_version"] = std::string(_ethVersion); + event["ts"] = timePointToString(std::chrono::system_clock::now()); + + get().outputJson(event, "starting"); + } +} + +void StructuredLogger::stopping(string const& _clientImpl, const char* _ethVersion) +{ + if (get().m_enabled) + { + Json::Value event; + event["client_impl"] = _clientImpl; + event["eth_version"] = std::string(_ethVersion); + event["ts"] = timePointToString(std::chrono::system_clock::now()); + + get().outputJson(event, "stopping"); + } +} + +void StructuredLogger::p2pConnected( + string const& _id, + bi::tcp::endpoint const& _addr, + chrono::system_clock::time_point const& _ts, + string const& _remoteVersion, + unsigned int _numConnections) +{ + if (get().m_enabled) + { + std::stringstream addrStream; + addrStream << _addr; + Json::Value event; + event["remote_version_string"] = _remoteVersion; + event["remote_addr"] = addrStream.str(); + event["remote_id"] = _id; + event["num_connections"] = Json::Value(_numConnections); + event["ts"] = timePointToString(_ts); + + get().outputJson(event, "p2p.connected"); + } +} + +void StructuredLogger::p2pDisconnected(string const& _id, bi::tcp::endpoint const& _addr, unsigned int _numConnections) +{ + if (get().m_enabled) + { + std::stringstream addrStream; + addrStream << _addr; + Json::Value event; + event["remote_addr"] = addrStream.str(); + event["remote_id"] = _id; + event["num_connections"] = Json::Value(_numConnections); + event["ts"] = timePointToString(chrono::system_clock::now()); + + get().outputJson(event, "p2p.disconnected"); + } +} + +void StructuredLogger::minedNewBlock( + string const& _hash, + string const& _blockNumber, + string const& _chainHeadHash, + string const& _prevHash) +{ + if (get().m_enabled) + { + Json::Value event; + event["block_hash"] = _hash; + event["block_number"] = _blockNumber; + event["chain_head_hash"] = _chainHeadHash; + event["ts"] = timePointToString(std::chrono::system_clock::now()); + event["block_prev_hash"] = _prevHash; + + get().outputJson(event, "eth.miner.new_block"); + } +} + +void StructuredLogger::chainReceivedNewBlock( + string const& _hash, + string const& _blockNumber, + string const& _chainHeadHash, + string const& _remoteID, + string const& _prevHash) +{ + if (get().m_enabled) + { + Json::Value event; + event["block_hash"] = _hash; + event["block_number"] = _blockNumber; + event["chain_head_hash"] = _chainHeadHash; + event["remote_id"] = _remoteID; + event["ts"] = timePointToString(chrono::system_clock::now()); + event["block_prev_hash"] = _prevHash; + + get().outputJson(event, "eth.chain.received.new_block"); + } +} + +void StructuredLogger::chainNewHead( + string const& _hash, + string const& _blockNumber, + string const& _chainHeadHash, + string const& _prevHash) +{ + if (get().m_enabled) + { + Json::Value event; + event["block_hash"] = _hash; + event["block_number"] = _blockNumber; + event["chain_head_hash"] = _chainHeadHash; + event["ts"] = timePointToString(chrono::system_clock::now()); + event["block_prev_hash"] = _prevHash; + + get().outputJson(event, "eth.miner.new_block"); + } +} + +void StructuredLogger::transactionReceived(string const& _hash, string const& _remoteId) +{ + if (get().m_enabled) + { + Json::Value event; + event["tx_hash"] = _hash; + event["remote_id"] = _remoteId; + event["ts"] = timePointToString(chrono::system_clock::now()); + + get().outputJson(event, "eth.tx.received"); + } +} + + +} diff --git a/libdevcore/StructuredLogger.h b/libdevcore/StructuredLogger.h new file mode 100644 index 000000000..58b408ede --- /dev/null +++ b/libdevcore/StructuredLogger.h @@ -0,0 +1,104 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file StructuredLogger.h + * @author Lefteris Karapetsas + * @date 2015 + * + * A simple helper class for the structured logging + * The spec for the implemented log events is here: + * https://github.com/ethereum/system-testing/wiki/Log-Events + */ + +#pragma once + +#include +#include +#include + +namespace Json { class Value; } + +namespace dev +{ + +// TODO: Make the output stream configurable. stdout, stderr, file e.t.c. +class StructuredLogger +{ +public: + /** + * Initializes the structured logger object + * @param _enabled Whether logging is on or off + * @param _timeFormat A time format string as described here: + * http://en.cppreference.com/w/cpp/chrono/c/strftime + * with which to display timestamps + */ + void initialize(bool _enabled, std::string const& _timeFormat) + { + m_enabled = _enabled; + m_timeFormat = _timeFormat; + } + + static StructuredLogger& get() + { + static StructuredLogger instance; + return instance; + } + + static void starting(std::string const& _clientImpl, const char* _ethVersion); + static void stopping(std::string const& _clientImpl, const char* _ethVersion); + static void p2pConnected( + std::string const& _id, + bi::tcp::endpoint const& _addr, + std::chrono::system_clock::time_point const& _ts, + std::string const& _remoteVersion, + unsigned int _numConnections + ); + static void p2pDisconnected(std::string const& _id, bi::tcp::endpoint const& _addr, unsigned int _numConnections); + static void minedNewBlock( + std::string const& _hash, + std::string const& _blockNumber, + std::string const& _chainHeadHash, + std::string const& _prevHash + ); + static void chainReceivedNewBlock( + std::string const& _hash, + std::string const& _blockNumber, + std::string const& _chainHeadHash, + std::string const& _remoteID, + std::string const& _prevHash + ); + static void chainNewHead( + std::string const& _hash, + std::string const& _blockNumber, + std::string const& _chainHeadHash, + std::string const& _prevHash + ); + static void transactionReceived(std::string const& _hash, std::string const& _remoteId); +private: + // Singleton class. Private default ctor and no copying + StructuredLogger() = default; + StructuredLogger(StructuredLogger const&) = delete; + void operator=(StructuredLogger const&) = delete; + + /// @returns a string representation of a timepoint + static std::string timePointToString(std::chrono::system_clock::time_point const& _ts); + void outputJson(Json::Value const& _value, std::string const& _name) const; + + bool m_enabled = false; + std::string m_timeFormat = "%Y-%m-%dT%H:%M:%S"; +}; + +} diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index c7c55758b..aabce4d4b 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -317,6 +318,13 @@ h256s BlockChain::import(bytes const& _block, OverlayDB const& _db) } #endif + StructuredLogger::chainReceivedNewBlock( + bi.headerHash(WithoutNonce).abridged(), + bi.nonce.abridged(), + currentHash().abridged(), + "", // TODO: remote id ?? + bi.parentHash.abridged() + ); // cnote << "Parent " << bi.parentHash << " has " << details(bi.parentHash).children.size() << " children."; h256s ret; @@ -331,6 +339,12 @@ h256s BlockChain::import(bytes const& _block, OverlayDB const& _db) } m_extrasDB->Put(m_writeOptions, ldb::Slice("best"), ldb::Slice((char const*)&newHash, 32)); clog(BlockChainNote) << " Imported and best" << td << ". Has" << (details(bi.parentHash).children.size() - 1) << "siblings. Route:" << toString(ret); + StructuredLogger::chainNewHead( + bi.headerHash(WithoutNonce).abridged(), + bi.nonce.abridged(), + currentHash().abridged(), + bi.parentHash.abridged() + ); } else { diff --git a/libethereum/BlockChain.h b/libethereum/BlockChain.h index 0c5587d2a..576deaadf 100644 --- a/libethereum/BlockChain.h +++ b/libethereum/BlockChain.h @@ -96,7 +96,7 @@ public: BlockInfo info(h256 _hash) const { return BlockInfo(block(_hash)); } BlockInfo info() const { return BlockInfo(block()); } - /// Get the familial details concerning a block (or the most recent mined if none given). Thread-safe. + /// Get the familiar details concerning a block (or the most recent mined if none given). Thread-safe. BlockDetails details(h256 _hash) const { return queryExtras(_hash, m_details, x_details, NullBlockDetails); } BlockDetails details() const { return details(currentHash()); } diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index fe07cbb18..b0b95c7bf 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "Defaults.h" #include "Executive.h" @@ -59,7 +60,7 @@ void VersionChecker::setOk() } } -Client::Client(p2p::Host* _extNet, std::string const& _dbPath, bool _forceClean, u256 _networkId, int miners): +Client::Client(p2p::Host* _extNet, std::string const& _dbPath, bool _forceClean, u256 _networkId, int _miners): Worker("eth"), m_vc(_dbPath), m_bc(_dbPath, !m_vc.ok() || _forceClean), @@ -69,8 +70,8 @@ Client::Client(p2p::Host* _extNet, std::string const& _dbPath, bool _forceClean, { m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId)); - if (miners > -1) - setMiningThreads(miners); + if (_miners > -1) + setMiningThreads(_miners); else setMiningThreads(); if (_dbPath.size()) @@ -417,6 +418,7 @@ void Client::transact(Secret _secret, u256 _value, Address _dest, bytes const& _ } Transaction t(_value, _gasPrice, _gas, _dest, _data, n, _secret); // cdebug << "Nonce at " << toAddress(_secret) << " pre:" << m_preMine.transactionsFrom(toAddress(_secret)) << " post:" << m_postMine.transactionsFrom(toAddress(_secret)); + StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged()); cnote << "New transaction " << t; m_tq.attemptImport(t.rlp()); } diff --git a/libethereum/Client.h b/libethereum/Client.h index c689363b3..187152672 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -44,6 +44,7 @@ namespace dev { + namespace eth { @@ -166,7 +167,13 @@ class Client: public MinerHost, public Interface, Worker public: /// New-style Constructor. - explicit Client(p2p::Host* _host, std::string const& _dbPath = std::string(), bool _forceClean = false, u256 _networkId = 0, int miners = -1); + explicit Client( + p2p::Host* _host, + std::string const& _dbPath = std::string(), + bool _forceClean = false, + u256 _networkId = 0, + int _miners = -1 + ); /// Destructor. virtual ~Client(); diff --git a/libethereum/Miner.h b/libethereum/Miner.h index fd449e995..5914770d2 100644 --- a/libethereum/Miner.h +++ b/libethereum/Miner.h @@ -32,6 +32,7 @@ namespace dev { + namespace eth { diff --git a/libethereum/State.cpp b/libethereum/State.cpp index 76ad1f269..fad9112db 100644 --- a/libethereum/State.cpp +++ b/libethereum/State.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -820,6 +821,12 @@ void State::completeMine() ret.swapOut(m_currentBytes); m_currentBlock.hash = sha3(RLP(m_currentBytes)[0].data()); cnote << "Mined " << m_currentBlock.hash.abridged() << "(parent: " << m_currentBlock.parentHash.abridged() << ")"; + StructuredLogger::minedNewBlock( + m_currentBlock.hash.abridged(), + m_currentBlock.nonce.abridged(), + "", //TODO: chain head hash here ?? + m_currentBlock.parentHash.abridged() + ); // Quickly reset the transactions. // TODO: Leave this in a better state than this limbo, or at least record that it's in limbo. diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 44a17fcb5..a4f7bcd92 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include "Session.h" @@ -59,7 +60,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, byte for (auto address: m_ifAddresses) if (address.is_v4()) clog(NetNote) << "IP Address: " << address << " = " << (isPrivateAddress(address) ? "[LOCAL]" : "[PEER]"); - + clog(NetNote) << "Id:" << id(); } @@ -88,11 +89,11 @@ void Host::stop() return; m_run = false; } - + // wait for m_timer to reset (indicating network scheduler has stopped) while (!!m_timer) this_thread::sleep_for(chrono::milliseconds(50)); - + // stop worker thread stopWorking(); } @@ -101,12 +102,12 @@ void Host::doneWorking() { // reset ioservice (allows manually polling network, below) m_ioService.reset(); - + // shutdown acceptor m_tcp4Acceptor.cancel(); if (m_tcp4Acceptor.is_open()) m_tcp4Acceptor.close(); - + // There maybe an incoming connection which started but hasn't finished. // Wait for acceptor to end itself instead of assuming it's complete. // This helps ensure a peer isn't stopped at the same time it's starting @@ -133,17 +134,17 @@ void Host::doneWorking() } if (!n) break; - + // poll so that peers send out disconnect packets m_ioService.poll(); } - + // stop network (again; helpful to call before subsequent reset()) m_ioService.stop(); - + // reset network (allows reusing ioservice in future) m_ioService.reset(); - + // finally, clear out peers (in case they're lingering) RecursiveGuard l(x_sessions); m_sessions.clear(); @@ -158,6 +159,13 @@ void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) { { clog(NetNote) << "p2p.host.peer.register" << _s->m_peer->id.abridged(); + StructuredLogger::p2pConnected( + _s->m_peer->id.abridged(), + _s->m_peer->peerEndpoint(), + _s->m_peer->m_lastConnected, + _s->m_info.clientVersion, + peerCount() + ); RecursiveGuard l(x_sessions); // TODO: temporary loose-coupling; if m_peers already has peer, // it is same as _s->m_peer. (fixing next PR) @@ -165,7 +173,7 @@ void Host::registerPeer(std::shared_ptr _s, CapDescs const& _caps) m_peers[_s->m_peer->id] = _s->m_peer; m_sessions[_s->m_peer->id] = _s; } - + unsigned o = (unsigned)UserPacket; for (auto const& i: _caps) if (haveCapability(i)) @@ -181,7 +189,7 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) if (_e == NodeEntryAdded) { clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n; - + auto n = m_nodeTable->node(_n); if (n) { @@ -198,12 +206,12 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) p->endpoint = NodeIPEndpoint(n.endpoint.udp, n.endpoint.tcp); p->required = n.required; m_peers[_n] = p; - + clog(NetNote) << "p2p.host.peers.events.peersAdded " << _n << p->endpoint.tcp.address() << p->endpoint.udp.address(); } p->endpoint.tcp = n.endpoint.tcp; } - + // TODO: Implement similar to discover. Attempt connecting to nodes // until ideal peer count is reached; if all nodes are tried, // repeat. Notably, this is an integrated process such that @@ -219,7 +227,7 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) else if (_e == NodeEntryRemoved) { clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n; - + RecursiveGuard l(x_sessions); m_peers.erase(_n); } @@ -241,7 +249,7 @@ void Host::seal(bytes& _b) void Host::determinePublic(string const& _publicAddress, bool _upnp) { m_peerAddresses.clear(); - + // no point continuing if there are no interface addresses or valid listen port if (!m_ifAddresses.size() || m_listenPort < 1) return; @@ -250,7 +258,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp) for (auto addr: m_ifAddresses) if ((m_netPrefs.localNetworking || !isPrivateAddress(addr)) && !isLocalHostAddress(addr)) m_peerAddresses.insert(addr); - + // if user supplied address is a public address then we use it // if user supplied address is private, and localnetworking is enabled, we use it bi::address reqPublicAddr(bi::address(_publicAddress.empty() ? bi::address() : bi::address::from_string(_publicAddress))); @@ -264,7 +272,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp) m_tcpPublic = reqPublic; return; } - + // if address wasn't provided, then use first public ipv4 address found for (auto addr: m_peerAddresses) if (addr.is_v4() && !isPrivateAddress(addr)) @@ -272,7 +280,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp) m_tcpPublic = bi::tcp::endpoint(*m_peerAddresses.begin(), m_listenPort); return; } - + // or find address via upnp if (_upnp) { @@ -296,7 +304,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp) m_tcpPublic = bi::tcp::endpoint(addr, m_listenPort); return; } - + // otherwise address is unspecified m_tcpPublic = bi::tcp::endpoint(bi::address(), m_listenPort); } @@ -304,7 +312,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp) void Host::runAcceptor() { assert(m_listenPort > 0); - + if (m_run && !m_accepting) { clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")"; @@ -325,7 +333,7 @@ void Host::runAcceptor() // It's possible for an accepted connection to return an error in which // case the socket may be open and must be closed to prevent asio from // processing socket events after socket is deallocated. - + bi::tcp::socket *s = new bi::tcp::socket(m_ioService); m_tcp4Acceptor.async_accept(*s, [=](boost::system::error_code ec) { @@ -349,7 +357,7 @@ void Host::runAcceptor() clog(NetWarn) << "ERROR: " << _e.what(); } } - + // asio doesn't close socket on error if (!success && s->is_open()) { @@ -360,7 +368,7 @@ void Host::runAcceptor() m_accepting = false; delete s; - + if (ec.value() < 1) runAcceptor(); }); @@ -376,7 +384,7 @@ void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId) shared_ptr p; if (_nodeId) p = m_peers[_nodeId]; - + if (!p) p.reset(new Peer()); p->endpoint.tcp.address(_socket->remote_endpoint().address()); @@ -399,16 +407,16 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short this_thread::sleep_for(chrono::milliseconds(50)); if (!m_run) return; - + if (_tcpPeerPort < 30300 || _tcpPeerPort > 30305) cwarn << "Non-standard port being recorded: " << _tcpPeerPort; - + if (_tcpPeerPort >= /*49152*/32768) { cwarn << "Private port being recorded - setting to 0"; _tcpPeerPort = 0; } - + boost::system::error_code ec; bi::address addr = bi::address::from_string(_addr, ec); if (ec) @@ -435,20 +443,20 @@ void Host::connect(std::shared_ptr const& _p) this_thread::sleep_for(chrono::milliseconds(50)); if (!m_run) return; - + if (havePeerSession(_p->id)) { clog(NetWarn) << "Aborted connect. Node already connected."; return; } - + if (!m_nodeTable->haveNode(_p->id)) { clog(NetWarn) << "Aborted connect. Node not in node table."; m_nodeTable->addNode(*_p.get()); return; } - + // prevent concurrently connecting to a node Peer *nptr = _p.get(); { @@ -457,7 +465,7 @@ void Host::connect(std::shared_ptr const& _p) return; m_pendingPeerConns.insert(nptr); } - + clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged(); bi::tcp::socket* s = new bi::tcp::socket(m_ioService); s->async_connect(_p->peerEndpoint(), [=](boost::system::error_code const& ec) @@ -474,9 +482,10 @@ void Host::connect(std::shared_ptr const& _p) _p->m_lastDisconnect = NoDisconnect; _p->m_lastConnected = std::chrono::system_clock::now(); _p->m_failedAttempts = 0; + auto ps = make_shared(this, std::move(*s), _p); ps->start(); - + } delete s; Guard l(x_pendingNodeConns); @@ -515,22 +524,22 @@ void Host::run(boost::system::error_code const&) { // reset NodeTable m_nodeTable.reset(); - + // stopping io service allows running manual network operations for shutdown // and also stops blocking worker thread, allowing worker thread to exit m_ioService.stop(); - + // resetting timer signals network that nothing else can be scheduled to run m_timer.reset(); return; } - + m_nodeTable->processEvents(); - + for (auto p: m_sessions) if (auto pp = p.second.lock()) pp->serviceNodesRequest(); - + keepAlivePeers(); disconnectLatePeers(); @@ -544,15 +553,15 @@ void Host::run(boost::system::error_code const&) connect(p.second); break; } - + if (c < m_idealPeerCount) m_nodeTable->discover(); - + auto runcb = [this](boost::system::error_code const& error) { run(error); }; m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval)); m_timer->async_wait(runcb); } - + void Host::startedWorking() { asserts(!m_timer); @@ -566,33 +575,33 @@ void Host::startedWorking() m_timer.reset(new boost::asio::deadline_timer(m_ioService)); m_run = true; } - + // try to open acceptor (todo: ipv6) m_listenPort = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs.listenPort); - + // start capability threads for (auto const& h: m_capabilities) h.second->onStarting(); - + // determine public IP, but only if we're able to listen for connections // todo: GUI when listen is unavailable in UI if (m_listenPort) { determinePublic(m_netPrefs.publicIP, m_netPrefs.upnp); - + if (m_listenPort > 0) runAcceptor(); } else clog(NetNote) << "p2p.start.notice id:" << id().abridged() << "Listen port is invalid or unavailable. Node Table using default port (30303)."; - + // TODO: add m_tcpPublic endpoint; sort out endpoint stuff for nodetable m_nodeTable.reset(new NodeTable(m_ioService, m_alias, m_listenPort > 0 ? m_listenPort : 30303)); m_nodeTable->setEventHandler(new HostNodeTableHandler(*this)); restoreNetwork(&m_restoreNetwork); - + clog(NetNote) << "p2p.started id:" << id().abridged(); - + run(boost::system::error_code()); } @@ -606,7 +615,7 @@ void Host::keepAlivePeers() { if (chrono::steady_clock::now() - c_keepAliveInterval < m_lastPing) return; - + RecursiveGuard l(x_sessions); for (auto p: m_sessions) if (auto pp = p.second.lock()) @@ -641,7 +650,7 @@ bytes Host::saveNetwork() const peers.push_back(*p.second); } peers.sort(); - + RLPStream network; int count = 0; { @@ -683,7 +692,7 @@ bytes Host::saveNetwork() const count++; } } - + RLPStream ret(3); ret << 1 << m_alias.secret(); ret.appendList(count).appendRaw(network.out(), count); @@ -695,7 +704,7 @@ void Host::restoreNetwork(bytesConstRef _b) // nodes can only be added if network is added if (!isStarted()) BOOST_THROW_EXCEPTION(NetworkStartRequired()); - + RecursiveGuard l(x_sessions); RLP r(_b); if (r.itemCount() > 0 && r[0].isInt() && r[0].toInt() == 1) diff --git a/libp2p/Host.h b/libp2p/Host.h index baf8f0585..48e678030 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -56,7 +56,7 @@ public: HostNodeTableHandler(Host& _host); Host const& host() const { return m_host; } - + private: virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e); @@ -82,23 +82,27 @@ class Host: public Worker friend class HostNodeTableHandler; friend class Session; friend class HostCapabilityFace; - + public: /// Start server, listening for connections on the given port. - Host(std::string const& _clientVersion, NetworkPreferences const& _n = NetworkPreferences(), bytesConstRef _restoreNetwork = bytesConstRef()); + Host( + std::string const& _clientVersion, + NetworkPreferences const& _n = NetworkPreferences(), + bytesConstRef _restoreNetwork = bytesConstRef() + ); /// Will block on network process events. virtual ~Host(); - + /// Interval at which Host::run will call keepAlivePeers to ping peers. std::chrono::seconds const c_keepAliveInterval = std::chrono::seconds(30); /// Disconnect timeout after failure to respond to keepAlivePeers ping. std::chrono::milliseconds const c_keepAliveTimeOut = std::chrono::milliseconds(1000); - + /// Default host for current version of client. static std::string pocHost(); - + /// Basic peer network protocol version. unsigned protocolVersion() const; @@ -108,23 +112,23 @@ public: bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; } CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } } - + bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? !!m_sessions[_id].lock() : false; } - + void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort); - + /// Set ideal number of peers. void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } /// Get peer information. PeerSessionInfos peerSessionInfo() const; - + /// Get number of peers connected. size_t peerCount() const; - + /// Get the address we're listening on currently. std::string listenAddress() const { return m_tcpPublic.address().to_string(); } - + /// Get the port we're listening on currently. unsigned short listenPort() const { return m_tcpPublic.port(); } @@ -138,11 +142,11 @@ public: /// Start network. @threadsafe void start(); - + /// Stop network. @threadsafe /// Resets acceptor, socket, and IO service. Called by deallocator. void stop(); - + /// @returns if network is running. bool isStarted() const { return m_run; } @@ -155,25 +159,25 @@ protected: /// Deserialise the data and populate the set of known peers. void restoreNetwork(bytesConstRef _b); - + private: /// Populate m_peerAddresses with available public addresses. void determinePublic(std::string const& _publicAddress, bool _upnp); - + void connect(std::shared_ptr const& _p); - + /// Ping the peers to update the latency information and disconnect peers which have timed out. void keepAlivePeers(); - + /// Disconnect peers which didn't respond to keepAlivePeers ping prior to c_keepAliveTimeOut. void disconnectLatePeers(); - + /// Called only from startedWorking(). void runAcceptor(); - + /// Handler for verifying handshake siganture before creating session. _nodeId is passed for outbound connections. If successful, socket is moved to Session via std::move. void doHandshake(bi::tcp::socket* _socket, NodeId _nodeId = NodeId()); - + void seal(bytes& _b); /// Called by Worker. Not thread-safe; to be called only by worker. @@ -183,7 +187,7 @@ private: /// Run network. Not thread-safe; to be called only by worker. virtual void doWork(); - + /// Shutdown network. Not thread-safe; to be called only by worker. virtual void doneWorking(); @@ -191,14 +195,14 @@ private: static KeyPair networkAlias(bytesConstRef _b); bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes. - + bool m_run = false; ///< Whether network is running. std::mutex x_runTimer; ///< Start/stop mutex. - + std::string m_clientVersion; ///< Our version string. NetworkPreferences m_netPrefs; ///< Network settings. - + /// Interface addresses (private, public) std::vector m_ifAddresses; ///< Interface addresses. @@ -206,10 +210,10 @@ private: ba::io_service m_ioService; ///< IOService for network stuff. bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor. - + std::unique_ptr m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. - + std::set m_pendingPeerConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). Mutex x_pendingNodeConns; @@ -219,22 +223,21 @@ private: /// Shared storage of Peer objects. Peers are created or destroyed on demand by the Host. Active sessions maintain a shared_ptr to a Peer; std::map> m_peers; - + /// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run()) /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. mutable std::map> m_sessions; mutable RecursiveMutex x_sessions; unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to. - + std::set m_peerAddresses; ///< Public addresses that peers (can) know us by. std::map> m_capabilities; ///< Each of the capabilities we support. std::chrono::steady_clock::time_point m_lastPing; ///< Time we sent the last ping to all peers. - bool m_accepting = false; }; - + } } diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 6ff765cf6..ec0a2d0d7 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include "Host.h" #include "Capability.h" @@ -467,6 +468,11 @@ void Session::drop(DisconnectReason _reason) void Session::disconnect(DisconnectReason _reason) { clogS(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")"; + StructuredLogger::p2pDisconnected( + m_info.id.abridged(), + m_peer->peerEndpoint(), + m_server->peerCount() + ); if (m_socket.is_open()) { RLPStream s; diff --git a/libwebthree/WebThree.cpp b/libwebthree/WebThree.cpp index bee297a79..1cf478746 100644 --- a/libwebthree/WebThree.cpp +++ b/libwebthree/WebThree.cpp @@ -35,15 +35,21 @@ using namespace dev::p2p; using namespace dev::eth; using namespace dev::shh; -WebThreeDirect::WebThreeDirect(std::string const& _clientVersion, std::string const& _dbPath, bool _forceClean, std::set const& _interfaces, NetworkPreferences const& _n, bytesConstRef _network, int miners): +WebThreeDirect::WebThreeDirect( + std::string const& _clientVersion, + std::string const& _dbPath, + bool _forceClean, + std::set const& _interfaces, + NetworkPreferences const& _n, + bytesConstRef _network, int _miners +): m_clientVersion(_clientVersion), m_net(_clientVersion, _n, _network) { if (_dbPath.size()) Defaults::setDBPath(_dbPath); if (_interfaces.count("eth")) - m_ethereum.reset(new eth::Client(&m_net, _dbPath, _forceClean, 0, miners)); - + m_ethereum.reset(new eth::Client(&m_net, _dbPath, _forceClean, 0, _miners)); if (_interfaces.count("shh")) m_whisper = m_net.registerCapability(new WhisperHost); diff --git a/libwebthree/WebThree.h b/libwebthree/WebThree.h index 4fa1d1fe5..4a3bee59b 100644 --- a/libwebthree/WebThree.h +++ b/libwebthree/WebThree.h @@ -103,7 +103,15 @@ class WebThreeDirect : public WebThreeNetworkFace public: /// Constructor for private instance. If there is already another process on the machine using @a _dbPath, then this will throw an exception. /// ethereum() may be safely static_cast()ed to a eth::Client*. - WebThreeDirect(std::string const& _clientVersion, std::string const& _dbPath, bool _forceClean = false, std::set const& _interfaces = {"eth", "shh"}, p2p::NetworkPreferences const& _n = p2p::NetworkPreferences(), bytesConstRef _network = bytesConstRef(), int miners = -1); + WebThreeDirect( + std::string const& _clientVersion, + std::string const& _dbPath, + bool _forceClean = false, + std::set const& _interfaces = {"eth", "shh"}, + p2p::NetworkPreferences const& _n = p2p::NetworkPreferences(), + bytesConstRef _network = bytesConstRef(), + int _miners = -1 + ); /// Destructor. ~WebThreeDirect();