diff --git a/libethereum/CommonNet.cpp b/libethereum/CommonNet.cpp index 84443eb11..9bc277f86 100644 --- a/libethereum/CommonNet.cpp +++ b/libethereum/CommonNet.cpp @@ -22,41 +22,3 @@ #include "CommonNet.h" using namespace std; using namespace eth; - -// Helper function to determine if an address falls within one of the reserved ranges -// For V4: -// Class A "10.*", Class B "172.[16->31].*", Class C "192.168.*" -// Not implemented yet for V6 -bool eth::isPrivateAddress(bi::address _addressToCheck) -{ - if (_addressToCheck.is_v4()) - { - bi::address_v4 v4Address = _addressToCheck.to_v4(); - bi::address_v4::bytes_type bytesToCheck = v4Address.to_bytes(); - if (bytesToCheck[0] == 10 || bytesToCheck[0] == 127) - return true; - if (bytesToCheck[0] == 172 && (bytesToCheck[1] >= 16 && bytesToCheck[1] <= 31)) - return true; - if (bytesToCheck[0] == 192 && bytesToCheck[1] == 168) - return true; - } - return false; -} - -std::string eth::reasonOf(DisconnectReason _r) -{ - switch (_r) - { - case DisconnectRequested: return "Disconnect was requested."; - case TCPError: return "Low-level TCP communication error."; - case BadProtocol: return "Data format error."; - case UselessPeer: return "Peer had no use for this node."; - case TooManyPeers: return "Peer had too many connections."; - case DuplicatePeer: return "Peer was already connected."; - case WrongGenesis: return "Disagreement over genesis block."; - case IncompatibleProtocol: return "Peer protocol versions are incompatible."; - case ClientQuit: return "Peer is exiting."; - default: return "Unknown reason."; - } -} - diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index b76893d5c..0b848e2c3 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -24,89 +24,34 @@ #pragma once #include -#include -#include #include #include #include -namespace ba = boost::asio; -namespace bi = boost::asio::ip; namespace eth { -bool isPrivateAddress(bi::address _addressToCheck); - static const eth::uint c_maxHashes = 32; ///< Maximum number of hashes BlockHashes will ever send. static const eth::uint c_maxHashesAsk = 32; ///< Maximum number of hashes GetBlockHashes will ever ask for. static const eth::uint c_maxBlocks = 16; ///< Maximum number of blocks Blocks will ever send. static const eth::uint c_maxBlocksAsk = 16; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain). +class UPnP; class OverlayDB; class BlockChain; class TransactionQueue; class EthereumHost; class EthereumSession; -struct NetWarn: public LogChannel { static const char* name() { return "!N!"; } static const int verbosity = 0; }; -struct NetNote: public LogChannel { static const char* name() { return "*N*"; } static const int verbosity = 1; }; -struct NetMessageSummary: public LogChannel { static const char* name() { return "-N-"; } static const int verbosity = 2; }; -struct NetConnect: public LogChannel { static const char* name() { return "+N+"; } static const int verbosity = 4; }; -struct NetMessageDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 5; }; -struct NetTriviaSummary: public LogChannel { static const char* name() { return "-N-"; } static const int verbosity = 10; }; -struct NetTriviaDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 11; }; -struct NetAllDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 13; }; -struct NetRight: public LogChannel { static const char* name() { return ">N>"; } static const int verbosity = 14; }; -struct NetLeft: public LogChannel { static const char* name() { return " -#ifdef _WIN32 -// winsock is already included -// #include -#else -#include -#endif - #include #include #include #include +#include #include #include #include "BlockChain.h" @@ -44,329 +37,19 @@ using namespace std; using namespace eth; -// Addresses we will skip during network interface discovery -// Use a vector as the list is small -// Why this and not names? -// Under MacOSX loopback (127.0.0.1) can be named lo0 and br0 are bridges (0.0.0.0) -static const set c_rejectAddresses = { - {bi::address_v4::from_string("127.0.0.1")}, - {bi::address_v6::from_string("::1")}, - {bi::address_v4::from_string("0.0.0.0")}, - {bi::address_v6::from_string("::")} -}; - -EthereumHost::EthereumHost(std::string const& _clientVersion, BlockChain const& _ch, u256 _networkId, unsigned short _port, NodeMode _m, string const& _publicAddress, bool _upnp): - m_clientVersion(_clientVersion), - m_mode(_m), - m_listenPort(_port), +EthereumHost::EthereumHost(BlockChain const& _ch, u256 _networkId): m_chain(&_ch), - m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), _port)), - m_socket(m_ioService), - m_key(KeyPair::create()), m_networkId(_networkId) { - populateAddresses(); - determinePublic(_publicAddress, _upnp); - ensureAccepting(); - clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4)) << "Mode: " << (_m == NodeMode::PeerServer ? "PeerServer" : "Full"); -} - -EthereumHost::EthereumHost(std::string const& _clientVersion, BlockChain const& _ch, u256 _networkId, NodeMode _m, string const& _publicAddress, bool _upnp): - m_clientVersion(_clientVersion), - m_mode(_m), - m_listenPort(0), - m_chain(&_ch), - m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), 0)), - m_socket(m_ioService), - m_key(KeyPair::create()), - m_networkId(_networkId) -{ - m_listenPort = m_acceptor.local_endpoint().port(); - - // populate addresses. - populateAddresses(); - determinePublic(_publicAddress, _upnp); - ensureAccepting(); - clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4)) << "Mode: " << (m_mode == NodeMode::PeerServer ? "PeerServer" : "Full"); -} - -EthereumHost::EthereumHost(std::string const& _clientVersion, BlockChain const& _ch, u256 _networkId, NodeMode _m): - m_clientVersion(_clientVersion), - m_mode(_m), - m_listenPort(0), - m_chain(&_ch), - m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), 0)), - m_socket(m_ioService), - m_key(KeyPair::create()), - m_networkId(_networkId) -{ - // populate addresses. - populateAddresses(); - clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4)) << "Mode: " << (m_mode == NodeMode::PeerServer ? "PeerServer" : "Full"); } EthereumHost::~EthereumHost() { - disconnectPeers(); - - for (auto i: m_peers) + for (auto i: host()->m_peers) if (shared_ptr p = i.second.lock()) p->giveUpOnFetch(); } -void EthereumHost::registerPeer(std::shared_ptr _s) -{ - Guard l(x_peers); - m_peers[_s->m_id] = _s; -} - -void EthereumHost::disconnectPeers() -{ - for (unsigned n = 0;; n = 0) - { - { - Guard l(x_peers); - for (auto i: m_peers) - if (auto p = i.second.lock()) - { - p->disconnect(ClientQuit); - n++; - } - } - if (!n) - break; - m_ioService.poll(); - this_thread::sleep_for(chrono::milliseconds(100)); - } - - delete m_upnp; -} - -unsigned EthereumHost::protocolVersion() -{ - return c_protocolVersion; -} - -void EthereumHost::seal(bytes& _b) -{ - _b[0] = 0x22; - _b[1] = 0x40; - _b[2] = 0x08; - _b[3] = 0x91; - uint32_t len = (uint32_t)_b.size() - 8; - _b[4] = (len >> 24) & 0xff; - _b[5] = (len >> 16) & 0xff; - _b[6] = (len >> 8) & 0xff; - _b[7] = len & 0xff; -} - -void EthereumHost::determinePublic(string const& _publicAddress, bool _upnp) -{ - if (_upnp) - try - { - m_upnp = new UPnP; - } - catch (NoUPnPDevice) {} // let m_upnp continue as null - we handle it properly. - - bi::tcp::resolver r(m_ioService); - if (m_upnp && m_upnp->isValid() && m_peerAddresses.size()) - { - clog(NetNote) << "External addr: " << m_upnp->externalIP(); - int p = m_upnp->addRedirect(m_peerAddresses[0].to_string().c_str(), m_listenPort); - if (p) - clog(NetNote) << "Punched through NAT and mapped local port" << m_listenPort << "onto external port" << p << "."; - else - { - // couldn't map - clog(NetWarn) << "Couldn't punch through NAT (or no NAT in place). Assuming " << m_listenPort << " is local & external port."; - p = m_listenPort; - } - - auto eip = m_upnp->externalIP(); - if (eip == string("0.0.0.0") && _publicAddress.empty()) - m_public = bi::tcp::endpoint(bi::address(), (unsigned short)p); - else - { - m_public = bi::tcp::endpoint(bi::address::from_string(_publicAddress.empty() ? eip : _publicAddress), (unsigned short)p); - m_addresses.push_back(m_public.address().to_v4()); - } - } - else - { - // No UPnP - fallback on given public address or, if empty, the assumed peer address. - m_public = bi::tcp::endpoint(_publicAddress.size() ? bi::address::from_string(_publicAddress) - : m_peerAddresses.size() ? m_peerAddresses[0] - : bi::address(), m_listenPort); - m_addresses.push_back(m_public.address().to_v4()); - } -} - -void EthereumHost::populateAddresses() -{ -#ifdef _WIN32 - WSAData wsaData; - if (WSAStartup(MAKEWORD(1, 1), &wsaData) != 0) - throw NoNetworking(); - - char ac[80]; - if (gethostname(ac, sizeof(ac)) == SOCKET_ERROR) - { - clog(NetWarn) << "Error " << WSAGetLastError() << " when getting local host name."; - WSACleanup(); - throw NoNetworking(); - } - - struct hostent* phe = gethostbyname(ac); - if (phe == 0) - { - clog(NetWarn) << "Bad host lookup."; - WSACleanup(); - throw NoNetworking(); - } - - for (int i = 0; phe->h_addr_list[i] != 0; ++i) - { - struct in_addr addr; - memcpy(&addr, phe->h_addr_list[i], sizeof(struct in_addr)); - char *addrStr = inet_ntoa(addr); - bi::address ad(bi::address::from_string(addrStr)); - m_addresses.push_back(ad.to_v4()); - bool isLocal = std::find(c_rejectAddresses.begin(), c_rejectAddresses.end(), ad) != c_rejectAddresses.end(); - if (!isLocal) - m_peerAddresses.push_back(ad.to_v4()); - clog(NetNote) << "Address: " << ac << " = " << m_addresses.back() << (isLocal ? " [LOCAL]" : " [PEER]"); - } - - WSACleanup(); -#else - ifaddrs* ifaddr; - if (getifaddrs(&ifaddr) == -1) - throw NoNetworking(); - - bi::tcp::resolver r(m_ioService); - - for (ifaddrs* ifa = ifaddr; ifa; ifa = ifa->ifa_next) - { - if (!ifa->ifa_addr) - continue; - if (ifa->ifa_addr->sa_family == AF_INET) - { - char host[NI_MAXHOST]; - if (getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST)) - continue; - try - { - auto it = r.resolve({host, "30303"}); - bi::tcp::endpoint ep = it->endpoint(); - bi::address ad = ep.address(); - m_addresses.push_back(ad.to_v4()); - bool isLocal = std::find(c_rejectAddresses.begin(), c_rejectAddresses.end(), ad) != c_rejectAddresses.end(); - if (!isLocal) - m_peerAddresses.push_back(ad.to_v4()); - clog(NetNote) << "Address: " << host << " = " << m_addresses.back() << (isLocal ? " [LOCAL]" : " [PEER]"); - } - catch (...) - { - clog(NetNote) << "Couldn't resolve: " << host; - } - } - } - - freeifaddrs(ifaddr); -#endif -} - -std::map EthereumHost::potentialPeers() -{ - std::map ret; - if (!m_public.address().is_unspecified()) - ret.insert(make_pair(m_key.pub(), m_public)); - Guard l(x_peers); - for (auto i: m_peers) - if (auto j = i.second.lock()) - { - auto ep = j->endpoint(); - // Skip peers with a listen port of zero or are on a private network - bool peerOnNet = (j->m_listenPort != 0 && !isPrivateAddress(ep.address())); - if (peerOnNet && ep.port() && j->m_id) - ret.insert(make_pair(i.first, ep)); - } - return ret; -} - -void EthereumHost::ensureAccepting() -{ - if (m_accepting == false) - { - clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_public << ")"; - m_accepting = true; - m_acceptor.async_accept(m_socket, [=](boost::system::error_code ec) - { - if (!ec) - try - { - try { - clog(NetConnect) << "Accepted connection from " << m_socket.remote_endpoint(); - } catch (...){} - bi::address remoteAddress = m_socket.remote_endpoint().address(); - // Port defaults to 0 - we let the hello tell us which port the peer listens to - auto p = std::make_shared(this, std::move(m_socket), m_networkId, remoteAddress); - p->start(); - } - catch (std::exception const& _e) - { - clog(NetWarn) << "ERROR: " << _e.what(); - } - m_accepting = false; - if (ec.value() != 1 && (m_mode == NodeMode::PeerServer || peerCount() < m_idealPeerCount * 2)) - ensureAccepting(); - }); - } -} - -void EthereumHost::connect(std::string const& _addr, unsigned short _port) noexcept -{ - try - { - connect(bi::tcp::endpoint(bi::address::from_string(_addr), _port)); - } - catch (exception const& e) - { - // Couldn't connect - clog(NetConnect) << "Bad host " << _addr << " (" << e.what() << ")"; - } -} - -void EthereumHost::connect(bi::tcp::endpoint const& _ep) -{ - clog(NetConnect) << "Attempting connection to " << _ep; - bi::tcp::socket* s = new bi::tcp::socket(m_ioService); - s->async_connect(_ep, [=](boost::system::error_code const& ec) - { - if (ec) - { - clog(NetConnect) << "Connection refused to " << _ep << " (" << ec.message() << ")"; - for (auto i = m_incomingPeers.begin(); i != m_incomingPeers.end(); ++i) - if (i->second.first == _ep && i->second.second < 3) - { - m_freePeers.push_back(i->first); - goto OK; - } - // for-else - clog(NetConnect) << "Giving up."; - OK:; - } - else - { - auto p = make_shared(this, std::move(*s), m_networkId, _ep.address(), _ep.port()); - clog(NetConnect) << "Connected to " << _ep; - p->start(); - } - delete s; - }); -} - h256Set EthereumHost::neededBlocks(h256Set const& _exclude) { Guard l(x_blocksNeeded); @@ -391,20 +74,6 @@ h256Set EthereumHost::neededBlocks(h256Set const& _exclude) return ret; } -bool EthereumHost::havePeer(Public _id) const -{ - Guard l(x_peers); - - // Remove dead peers from list. - for (auto i = m_peers.begin(); i != m_peers.end();) - if (i->second.lock().get()) - ++i; - else - i = m_peers.erase(i); - - return !!m_peers.count(_id); -} - bool EthereumHost::ensureInitialised(TransactionQueue& _tq) { if (m_latestBlockSent == h256()) @@ -448,24 +117,9 @@ bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data) bool EthereumHost::sync(TransactionQueue& _tq, BlockQueue& _bq) { bool netChange = ensureInitialised(_tq); - - if (m_mode == NodeMode::Full) - { - auto h = m_chain->currentHash(); - - maintainTransactions(_tq, h); - maintainBlocks(_bq, h); - - // Connect to additional peers - growPeers(); - } - - // platform for consensus of social contract. - // restricts your freedom but does so fairly. and that's the value proposition. - // guarantees that everyone else respect the rules of the system. (i.e. obeys laws). - - prunePeers(); - + auto h = m_chain->currentHash(); + maintainTransactions(_tq, h); + maintainBlocks(_bq, h); return netChange; } @@ -481,8 +135,8 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash m_incomingTransactions.clear(); // Send any new transactions. - Guard l(x_peers); - for (auto j: m_peers) + Guard l(host()->x_peers); + for (auto j: host()->m_peers) if (auto p = j.second.lock()) { bytes b; @@ -544,8 +198,8 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) ts.swapOut(b); seal(b); - Guard l(x_peers); - for (auto j: m_peers) + Guard l(host()->x_peers); + for (auto j: host()->m_peers) if (auto p = j.second.lock()) { if (!p->m_knownBlocks.count(_currentHash)) @@ -556,40 +210,6 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) } } -void EthereumHost::growPeers() -{ - Guard l(x_peers); - while (m_peers.size() < m_idealPeerCount) - { - if (m_freePeers.empty()) - { - if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10)) - { - RLPStream s; - bytes b; - (EthereumSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b); - seal(b); - for (auto const& i: m_peers) - if (auto p = i.second.lock()) - if (p->isOpen()) - p->send(&b); - m_lastPeersRequest = chrono::steady_clock::now(); - } - - - if (!m_accepting) - ensureAccepting(); - - break; - } - - auto x = time(0) % m_freePeers.size(); - m_incomingPeers[m_freePeers[x]].second++; - connect(m_incomingPeers[m_freePeers[x]].first); - m_freePeers.erase(m_freePeers.begin() + x); - } -} - void EthereumHost::noteHaveChain(std::shared_ptr const& _from) { auto td = _from->m_totalDifficulty; @@ -616,92 +236,9 @@ void EthereumHost::noteHaveChain(std::shared_ptr const& _from) } { - Guard l(x_peers); - for (auto const& i: m_peers) + Guard l(host()->x_peers); + for (auto const& i: host()->m_peers) if (shared_ptr p = i.second.lock()) p->restartGettingChain(); } } - - -void EthereumHost::prunePeers() -{ - Guard l(x_peers); - // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. - for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2) - while (m_peers.size() > m_idealPeerCount) - { - // look for worst peer to kick off - // first work out how many are old enough to kick off. - shared_ptr worst; - unsigned agedPeers = 0; - for (auto i: m_peers) - if (auto p = i.second.lock()) - if ((m_mode != NodeMode::PeerServer || p->m_caps != 0x01) && chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. - { - ++agedPeers; - if ((!worst || p->m_rating < worst->m_rating || (p->m_rating == worst->m_rating && p->m_connect > worst->m_connect))) // kill older ones - worst = p; - } - if (!worst || agedPeers <= m_idealPeerCount) - break; - worst->disconnect(TooManyPeers); - } - - // Remove dead peers from list. - for (auto i = m_peers.begin(); i != m_peers.end();) - if (i->second.lock().get()) - ++i; - else - i = m_peers.erase(i); -} - -std::vector EthereumHost::peers(bool _updatePing) const -{ - Guard l(x_peers); - if (_updatePing) - const_cast(this)->pingAll(); - this_thread::sleep_for(chrono::milliseconds(200)); - std::vector ret; - for (auto& i: m_peers) - if (auto j = i.second.lock()) - if (j->m_socket.is_open()) - ret.push_back(j->m_info); - return ret; -} - -void EthereumHost::pingAll() -{ - Guard l(x_peers); - for (auto& i: m_peers) - if (auto j = i.second.lock()) - j->ping(); -} - -bytes EthereumHost::savePeers() const -{ - Guard l(x_peers); - RLPStream ret; - int n = 0; - for (auto& i: m_peers) - if (auto p = i.second.lock()) - if (p->m_socket.is_open() && p->endpoint().port()) - { - ret.appendList(3) << p->endpoint().address().to_v4().to_bytes() << p->endpoint().port() << p->m_id; - n++; - } - return RLPStream(n).appendRaw(ret.out(), n).out(); -} - -void EthereumHost::restorePeers(bytesConstRef _b) -{ - for (auto i: RLP(_b)) - { - auto k = (Public)i[2]; - if (!m_incomingPeers.count(k)) - { - m_incomingPeers.insert(make_pair(k, make_pair(bi::tcp::endpoint(bi::address_v4(i[0].toArray()), i[1].toInt()), 0))); - m_freePeers.push_back(k); - } - } -} diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 9a9f83b1b..f92456e8b 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -30,83 +30,37 @@ #include #include #include +#include #include "CommonNet.h" -namespace ba = boost::asio; -namespace bi = boost::asio::ip; +#include "EthereumSession.h" namespace eth { class RLPStream; class TransactionQueue; -class BlockQueue; /** * @brief The EthereumHost class * @warning None of this is thread-safe. You have been warned. */ -class EthereumHost +class EthereumHost: public HostCapability { - friend class EthereumSession; + friend class EthereumPeer; public: - /// Start server, listening for connections on the given port. - EthereumHost(std::string const& _clientVersion, BlockChain const& _ch, u256 _networkId, unsigned short _port, NodeMode _m = NodeMode::Full, std::string const& _publicAddress = std::string(), bool _upnp = true); - /// Start server, listening for connections on a system-assigned port. - EthereumHost(std::string const& _clientVersion, BlockChain const& _ch, u256 _networkId, NodeMode _m = NodeMode::Full, std::string const& _publicAddress = std::string(), bool _upnp = true); /// Start server, but don't listen. - EthereumHost(std::string const& _clientVersion, BlockChain const& _ch, u256 _networkId, NodeMode _m = NodeMode::Full); + EthereumHost(BlockChain const& _ch, u256 _networkId); /// Will block on network process events. - ~EthereumHost(); + virtual ~EthereumHost(); - /// Closes all peers. - void disconnectPeers(); - - static unsigned protocolVersion(); - u256 networkId() { return m_networkId; } - - /// Connect to a peer explicitly. - void connect(std::string const& _addr, unsigned short _port = 30303) noexcept; - void connect(bi::tcp::endpoint const& _ep); + unsigned protocolVersion() const { return c_protocolVersion; } + u256 networkId() const { return m_networkId; } /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. bool sync(TransactionQueue&, BlockQueue& _bc); - /// Conduct I/O, polling, syncing, whatever. - /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. - /// This won't touch alter the blockchain. - void process() { if (isInitialised()) m_ioService.poll(); } - - /// @returns true iff we have the a peer of the given id. - bool havePeer(Public _id) const; - - /// Set ideal number of peers. - void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } - - /// Set the mode of operation on the network. - void setMode(NodeMode _m) { m_mode = _m; } - - /// Get peer information. - std::vector peers(bool _updatePing = false) const; - - /// Get number of peers connected; equivalent to, but faster than, peers().size(). - size_t peerCount() const { Guard l(x_peers); return m_peers.size(); } - - /// Ping the peers, to update the latency information. - void pingAll(); - - /// Get the port we're listening on currently. - unsigned short listenPort() const { return m_public.port(); } - - /// Serialise the set of known peers. - bytes savePeers() const; - - /// Deserialise the data and populate the set of known peers. - void restorePeers(bytesConstRef _b); - - void registerPeer(std::shared_ptr _s); - private: /// Session wants to pass us a block that we might not have. /// @returns true if we didn't have it. @@ -115,16 +69,7 @@ private: void noteHaveChain(std::shared_ptr const& _who); /// Called when the peer can no longer provide us with any needed blocks. void noteDoneBlocks(); - /// Called when the session has provided us with a new peer we can connect to. - void noteNewPeers() {} - - void seal(bytes& _b); - void populateAddresses(); - void determinePublic(std::string const& _publicAddress, bool _upnp); - void ensureAccepting(); - void growPeers(); - void prunePeers(); void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock); void maintainBlocks(BlockQueue& _bq, h256 _currentBlock); @@ -135,35 +80,17 @@ private: /// Check to see if the network peer-state initialisation has happened. virtual bool isInitialised() const { return m_latestBlockSent; } + /// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first. bool ensureInitialised(TransactionQueue& _tq); - std::map potentialPeers(); - - std::string m_clientVersion; - NodeMode m_mode = NodeMode::Full; - - unsigned short m_listenPort; - BlockChain const* m_chain = nullptr; - ba::io_service m_ioService; - bi::tcp::acceptor m_acceptor; - bi::tcp::socket m_socket; - - UPnP* m_upnp = nullptr; - bi::tcp::endpoint m_public; - KeyPair m_key; u256 m_networkId; - mutable std::mutex x_peers; - mutable std::map> m_peers; // mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. - mutable std::recursive_mutex m_incomingLock; std::vector m_incomingTransactions; std::vector m_incomingBlocks; - std::map> m_incomingPeers; - std::vector m_freePeers; mutable std::mutex x_blocksNeeded; u256 m_totalDifficultyOfNeeded; @@ -172,14 +99,6 @@ private: h256 m_latestBlockSent; std::set m_transactionsSent; - - std::chrono::steady_clock::time_point m_lastPeersRequest; - unsigned m_idealPeerCount = 5; - - std::vector m_addresses; - std::vector m_peerAddresses; - - bool m_accepting = false; }; } diff --git a/libethereum/EthereumSession.cpp b/libethereum/EthereumSession.cpp index a04ccf0a3..b90665b4b 100644 --- a/libethereum/EthereumSession.cpp +++ b/libethereum/EthereumSession.cpp @@ -31,29 +31,55 @@ using namespace eth; #define clogS(X) eth::LogOutputStream(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " -EthereumSession::EthereumSession(EthereumHost* _s, bi::tcp::socket _socket, u256 _rNId, bi::address _peerAddress, unsigned short _peerPort): - m_server(_s), - m_socket(std::move(_socket)), - m_reqNetworkId(_rNId), - m_listenPort(_peerPort), - m_rating(0) +EthereumPeer::EthereumPeer(PeerSession* _s, HostCapability* _h): PeerCapability(_s, _h) { - m_disconnect = std::chrono::steady_clock::time_point::max(); - m_connect = std::chrono::steady_clock::now(); - m_info = PeerInfo({"?", _peerAddress.to_string(), m_listenPort, std::chrono::steady_clock::duration(0)}); + sendStatus(); } -EthereumSession::~EthereumSession() +EthereumPeer::~EthereumPeer() { giveUpOnFetch(); +} + +void EthereumPeer::sendStatus() +{ + RLPStream s; + m_session->prep(s); + s.appendList(9) << StatusPacket + << hostCapability()->protocolVersion() + << hostCapability()->networkId() + << hostCapability()->m_chain->details().totalDifficulty + << hostCapability()->m_chain->currentHash() + << hostCapability()->m_chain->genesisHash(); + sealAndSend(s); +} + +void EthereumPeer::startInitialSync() +{ + // Grab trsansactions off them. + { + RLPStream s; + prep(s).appendList(1); + s << GetTransactionsPacket; + sealAndSend(s); + } + + h256 c = m_server->m_chain->currentHash(); + uint n = m_server->m_chain->number(); + u256 td = max(m_server->m_chain->details().totalDifficulty, m_server->m_totalDifficultyOfNeeded); - // Read-chain finished for one reason or another. - try + clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << m_server->m_chain->details().totalDifficulty << "," << m_server->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty; + if (td > m_totalDifficulty) + return; // All good - we have the better chain. + + // Our chain isn't better - grab theirs. { - if (m_socket.is_open()) - m_socket.close(); + RLPStream s; + prep(s).appendList(3); + s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk; + m_neededBlocks = h256s(1, m_latestHash); + sealAndSend(s); } - catch (...){} } inline string toString(h256s const& _bs) @@ -66,7 +92,7 @@ inline string toString(h256s const& _bs) return out.str(); } -void EthereumSession::giveUpOnFetch() +void EthereumPeer::giveUpOnFetch() { clogS(NetNote) << "GIVE UP FETCH; can't get " << toString(m_askedBlocks); if (m_askedBlocks.size()) @@ -83,148 +109,37 @@ void EthereumSession::giveUpOnFetch() } } -bi::tcp::endpoint EthereumSession::endpoint() const +bool EthereumPeer::interpret(RLP const& _r) { - if (m_socket.is_open()) - try - { - return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_listenPort); - } - catch (...){} - - return bi::tcp::endpoint(); -} - -bool EthereumSession::interpret(RLP const& _r) -{ - clogS(NetRight) << _r; switch (_r[0].toInt()) { - case HelloPacket: + case StatusPacket: { m_protocolVersion = _r[1].toInt(); m_networkId = _r[2].toInt(); - auto clientVersion = _r[3].toString(); - m_caps = _r[4].toInt(); - m_listenPort = _r[5].toInt(); - m_id = _r[6].toHash(); - m_totalDifficulty = _r[7].toInt(); - m_latestHash = _r[8].toHash(); - auto genesisHash = _r[9].toHash(); - - clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort; + m_totalDifficulty = _r[3].toInt(); + m_latestHash = _r[4].toHash(); + auto genesisHash = _r[5].toHash(); - if (m_server->havePeer(m_id)) - { - // Already connected. - cwarn << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint(); - disconnect(DuplicatePeer); - return false; - } + clogS(NetMessageSummary) << "Status: " << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); if (genesisHash != m_server->m_chain->genesisHash()) - { - disconnect(WrongGenesis); - return false; - } + disable("Invalid genesis hash"); + if (m_protocolVersion != EthereumHost::protocolVersion()) + disable("Invalid protocol version."); + if (m_networkId != m_server->networkId() || !m_id) + disable("Invalid network identifier."); - if (m_protocolVersion != EthereumHost::protocolVersion() || m_networkId != m_server->networkId() || !m_id) - { - disconnect(IncompatibleProtocol); - return false; - } - try - { m_info = PeerInfo({clientVersion, m_socket.remote_endpoint().address().to_string(), m_listenPort, std::chrono::steady_clock::duration()}); } - catch (...) - { - disconnect(BadProtocol); - return false; - } - - m_server->registerPeer(shared_from_this()); startInitialSync(); - - // Grab trsansactions off them. - { - RLPStream s; - prep(s).appendList(1); - s << GetTransactionsPacket; - sealAndSend(s); - } - break; - } - case DisconnectPacket: - { - string reason = "Unspecified"; - if (_r[1].isInt()) - reason = reasonOf((DisconnectReason)_r[1].toInt()); - - clogS(NetMessageSummary) << "Disconnect (reason: " << reason << ")"; - if (m_socket.is_open()) - clogS(NetNote) << "Closing " << m_socket.remote_endpoint(); - else - clogS(NetNote) << "Remote closed."; - m_socket.close(); - return false; - } - case PingPacket: - { - clogS(NetTriviaSummary) << "Ping"; - RLPStream s; - sealAndSend(prep(s).appendList(1) << PongPacket); break; } - case PongPacket: - m_info.lastPing = std::chrono::steady_clock::now() - m_ping; - clogS(NetTriviaSummary) << "Latency: " << chrono::duration_cast(m_info.lastPing).count() << " ms"; - break; - case GetPeersPacket: + case GetTransactionsPacket: { - clogS(NetTriviaSummary) << "GetPeers"; - auto peers = m_server->potentialPeers(); - RLPStream s; - prep(s).appendList(peers.size() + 1); - s << PeersPacket; - for (auto i: peers) - { - clogS(NetTriviaDetail) << "Sending peer " << i.first.abridged() << i.second; - s.appendList(3) << bytesConstRef(i.second.address().to_v4().to_bytes().data(), 4) << i.second.port() << i.first; - } - sealAndSend(s); + if (m_server->m_mode == NodeMode::PeerServer) + break; + m_requireTransactions = true; break; } - case PeersPacket: - clogS(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; - for (unsigned i = 1; i < _r.itemCount(); ++i) - { - bi::address_v4 peerAddress(_r[i][0].toHash>().asArray()); - auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); - Public id = _r[i][2].toHash(); - if (isPrivateAddress(peerAddress)) - goto CONTINUE; - - clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")"; - - // check that it's not us or one we already know: - if (id && (m_server->m_key.pub() == id || m_server->havePeer(id) || m_server->m_incomingPeers.count(id))) - goto CONTINUE; - - // check that we're not already connected to addr: - if (!ep.port()) - goto CONTINUE; - for (auto i: m_server->m_addresses) - if (ep.address() == i && ep.port() == m_server->listenPort()) - goto CONTINUE; - for (auto i: m_server->m_incomingPeers) - if (i.second.first == ep) - goto CONTINUE; - m_server->m_incomingPeers[id] = make_pair(ep, 0); - m_server->m_freePeers.push_back(id); - m_server->noteNewPeers(); - clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")"; - CONTINUE:; - } - break; case TransactionsPacket: if (m_server->m_mode == NodeMode::PeerServer) break; @@ -350,20 +265,13 @@ bool EthereumSession::interpret(RLP const& _r) continueGettingChain(); break; } - case GetTransactionsPacket: - { - if (m_server->m_mode == NodeMode::PeerServer) - break; - m_requireTransactions = true; - break; - } default: - break; + return false; } return true; } -void EthereumSession::restartGettingChain() +void EthereumPeer::restartGettingChain() { if (m_askedBlocks.size()) { @@ -375,7 +283,7 @@ void EthereumSession::restartGettingChain() ensureGettingChain(); } -void EthereumSession::ensureGettingChain() +void EthereumPeer::ensureGettingChain() { if (m_askedBlocks.size()) return; // Already asked & waiting for some. @@ -383,7 +291,7 @@ void EthereumSession::ensureGettingChain() continueGettingChain(); } -void EthereumSession::continueGettingChain() +void EthereumPeer::continueGettingChain() { if (!m_askedBlocks.size()) m_askedBlocks = m_server->neededBlocks(m_failedBlocks); @@ -403,258 +311,3 @@ void EthereumSession::continueGettingChain() m_server->noteDoneBlocks(); } } - -void EthereumSession::ping() -{ - RLPStream s; - sealAndSend(prep(s).appendList(1) << PingPacket); - m_ping = std::chrono::steady_clock::now(); -} - -void EthereumSession::getPeers() -{ - RLPStream s; - sealAndSend(prep(s).appendList(1) << GetPeersPacket); -} - -RLPStream& EthereumSession::prep(RLPStream& _s) -{ - return _s.appendRaw(bytes(8, 0)); -} - -void EthereumSession::sealAndSend(RLPStream& _s) -{ - bytes b; - _s.swapOut(b); - m_server->seal(b); - sendDestroy(b); -} - -bool EthereumSession::checkPacket(bytesConstRef _msg) -{ - if (_msg.size() < 8) - return false; - if (!(_msg[0] == 0x22 && _msg[1] == 0x40 && _msg[2] == 0x08 && _msg[3] == 0x91)) - return false; - uint32_t len = ((_msg[4] * 256 + _msg[5]) * 256 + _msg[6]) * 256 + _msg[7]; - if (_msg.size() != len + 8) - return false; - RLP r(_msg.cropped(8)); - if (r.actualSize() != len) - return false; - return true; -} - -void EthereumSession::sendDestroy(bytes& _msg) -{ - clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(8)); - - if (!checkPacket(bytesConstRef(&_msg))) - { - cwarn << "INVALID PACKET CONSTRUCTED!"; - } - - bytes buffer = bytes(std::move(_msg)); - writeImpl(buffer); -} - -void EthereumSession::send(bytesConstRef _msg) -{ - clogS(NetLeft) << RLP(_msg.cropped(8)); - - if (!checkPacket(_msg)) - { - cwarn << "INVALID PACKET CONSTRUCTED!"; - } - - bytes buffer = bytes(_msg.toBytes()); - writeImpl(buffer); -} - -void EthereumSession::writeImpl(bytes& _buffer) -{ -// cerr << (void*)this << " writeImpl" << endl; - if (!m_socket.is_open()) - return; - - lock_guard l(m_writeLock); - m_writeQueue.push_back(_buffer); - if (m_writeQueue.size() == 1) - write(); -} - -void EthereumSession::write() -{ -// cerr << (void*)this << " write" << endl; - lock_guard l(m_writeLock); - if (m_writeQueue.empty()) - return; - - const bytes& bytes = m_writeQueue[0]; - auto self(shared_from_this()); - ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/) - { -// cerr << (void*)this << " write.callback" << endl; - - // must check queue, as write callback can occur following dropped() - if (ec) - { - cwarn << "Error sending: " << ec.message(); - dropped(); - } - else - { - m_writeQueue.pop_front(); - write(); - } - }); -} - -void EthereumSession::dropped() -{ -// cerr << (void*)this << " dropped" << endl; - if (m_socket.is_open()) - try - { - clogS(NetConnect) << "Closing " << m_socket.remote_endpoint(); - m_socket.close(); - } - catch (...) {} -} - -void EthereumSession::disconnect(int _reason) -{ - clogS(NetConnect) << "Disconnecting (reason:" << reasonOf((DisconnectReason)_reason) << ")"; - if (m_socket.is_open()) - { - if (m_disconnect == chrono::steady_clock::time_point::max()) - { - RLPStream s; - prep(s); - s.appendList(2) << DisconnectPacket << _reason; - sealAndSend(s); - m_disconnect = chrono::steady_clock::now(); - } - else - dropped(); - } -} - -void EthereumSession::start() -{ - RLPStream s; - prep(s); - s.appendList(10) << HelloPacket - << (uint)EthereumHost::protocolVersion() - << m_server->networkId() - << m_server->m_clientVersion - << (m_server->m_mode == NodeMode::Full ? 0x07 : m_server->m_mode == NodeMode::PeerServer ? 0x01 : 0) - << m_server->m_public.port() - << m_server->m_key.pub() - << m_server->m_chain->details().totalDifficulty - << m_server->m_chain->currentHash() - << m_server->m_chain->genesisHash(); - sealAndSend(s); - ping(); - getPeers(); - - doRead(); -} - -void EthereumSession::startInitialSync() -{ - h256 c = m_server->m_chain->currentHash(); - uint n = m_server->m_chain->number(); - u256 td = max(m_server->m_chain->details().totalDifficulty, m_server->m_totalDifficultyOfNeeded); - - clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << m_server->m_chain->details().totalDifficulty << "," << m_server->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty; - if (td > m_totalDifficulty) - return; // All good - we have the better chain. - - // Our chain isn't better - grab theirs. - RLPStream s; - prep(s).appendList(3); - s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk; - m_neededBlocks = h256s(1, m_latestHash); - sealAndSend(s); -} - -void EthereumSession::doRead() -{ - // ignore packets received while waiting to disconnect - if (chrono::steady_clock::now() - m_disconnect > chrono::seconds(0)) - return; - - auto self(shared_from_this()); - m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length) - { - // If error is end of file, ignore - if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) - { - // got here with length of 1241... - cwarn << "Error reading: " << ec.message(); - dropped(); - } - else if (ec && length == 0) - { - return; - } - else - { - try - { - m_incoming.resize(m_incoming.size() + length); - memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length); - while (m_incoming.size() > 8) - { - if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) - { - cwarn << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4)); - disconnect(BadProtocol); - return; - } - else - { - uint32_t len = fromBigEndian(bytesConstRef(m_incoming.data() + 4, 4)); - uint32_t tlen = len + 8; - if (m_incoming.size() < tlen) - break; - - // enough has come in. - auto data = bytesConstRef(m_incoming.data(), tlen); - if (!checkPacket(data)) - { - cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl; - cwarn << "INVALID MESSAGE RECEIVED"; - disconnect(BadProtocol); - return; - } - else - { - RLP r(data.cropped(8)); - if (!interpret(r)) - { - // error - dropped(); - return; - } - } - memmove(m_incoming.data(), m_incoming.data() + tlen, m_incoming.size() - tlen); - m_incoming.resize(m_incoming.size() - tlen); - } - } - doRead(); - } - catch (Exception const& _e) - { - clogS(NetWarn) << "ERROR: " << _e.description(); - dropped(); - } - catch (std::exception const& _e) - { - clogS(NetWarn) << "ERROR: " << _e.what(); - dropped(); - } - } - }); -} diff --git a/libethereum/EthereumSession.h b/libethereum/EthereumSession.h index 2a7e31155..1b10c71fe 100644 --- a/libethereum/EthereumSession.h +++ b/libethereum/EthereumSession.h @@ -29,34 +29,44 @@ #include #include #include "CommonNet.h" +#include "EthereumHost.h" namespace eth { +class WhisperSession: public PeerSession +{ +public: + WhisperSession(); + virtual ~WhisperSession(); + + static std::string name() { return "shh"; } + +private: + virtual bool interpret(RLP const&) {} +}; + /** * @brief The EthereumSession class * @todo Document fully. */ -class EthereumSession: public std::enable_shared_from_this +class EthereumPeer: public PeerCapability { friend class EthereumHost; public: - EthereumSession(EthereumHost* _server, bi::tcp::socket _socket, u256 _rNId, bi::address _peerAddress, unsigned short _peerPort = 0); - ~EthereumSession(); - - void start(); - void disconnect(int _reason); + EthereumPeer(PeerSession* _s, HostCapability* _h); + virtual ~EthereumPeer(); - void ping(); + static std::string name() { return "eth"; } - bool isOpen() const { return m_socket.is_open(); } - - bi::tcp::endpoint endpoint() const; ///< for other peers to connect to. + EthereumHost* hostCapability() const { return static_cast(PeerCapability::hostCapability()); } private: + virtual bool interpret(RLP const& _r); + + void sendStatus(); void startInitialSync(); - void getPeers(); /// Ensure that we are waiting for a bunch of blocks from our peer. void ensureGettingChain(); @@ -67,36 +77,10 @@ private: void giveUpOnFetch(); - void dropped(); - void doRead(); - void doWrite(std::size_t length); - bool interpret(RLP const& _r); - - /// @returns true iff the _msg forms a valid message for sending or receiving on the network. - static bool checkPacket(bytesConstRef _msg); - - static RLPStream& prep(RLPStream& _s); - void sealAndSend(RLPStream& _s); - void sendDestroy(bytes& _msg); - void send(bytesConstRef _msg); - void writeImpl(bytes& _buffer); - void write(); - EthereumHost* m_server; + EthereumHost* m_host; - std::recursive_mutex m_writeLock; - std::deque m_writeQueue; - - bi::tcp::socket m_socket; - std::array m_data; - PeerInfo m_info; - Public m_id; - - bytes m_incoming; uint m_protocolVersion; u256 m_networkId; - u256 m_reqNetworkId; - unsigned short m_listenPort; ///< Port that the remote client is listening on for connections. Useful for giving to peers. - uint m_caps; h256 m_latestHash; ///< Peer's latest block's hash. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. @@ -106,17 +90,10 @@ private: h256Set m_askedBlocks; ///< The blocks for which we sent the last GetBlocks for but haven't received a corresponding Blocks. bool m_askedBlocksChanged = true; - std::chrono::steady_clock::time_point m_ping; - std::chrono::steady_clock::time_point m_connect; - std::chrono::steady_clock::time_point m_disconnect; - - uint m_rating; bool m_requireTransactions; std::set m_knownBlocks; std::set m_knownTransactions; - - bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand. }; } diff --git a/libethnet/Common.cpp b/libethnet/Common.cpp index 5958ee9df..6224f93ca 100644 --- a/libethnet/Common.cpp +++ b/libethnet/Common.cpp @@ -59,3 +59,8 @@ std::string eth::reasonOf(DisconnectReason _r) } } +void PeerCapability::disable(std::string const& _problem) +{ + clogS(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem; + m_enabled = false; +} diff --git a/libethnet/Common.h b/libethnet/Common.h index 64bc20370..c70e42aaa 100644 --- a/libethnet/Common.h +++ b/libethnet/Common.h @@ -93,12 +93,16 @@ class PeerCapability; class HostCapabilityFace { + friend class PeerHost; + public: HostCapabilityFace(PeerHost*) {} virtual ~HostCapabilityFace() {} +protected: virtual std::string name() const = 0; virtual PeerCapability* newPeerCapability(PeerSession* _s) = 0; + virtual bool isInitialised() const = 0; }; template @@ -113,8 +117,9 @@ public: PeerHost* host() const { return m_host; } protected: + virtual bool isInitialised() const = 0; virtual std::string name() const { return PeerCap::name(); } - virtual PeerCapability* newPeerCapability(PeerSession* _s) { return new PeerCap(_s); } + virtual PeerCapability* newPeerCapability(PeerSession* _s) { return new PeerCap(_s, this); } private: PeerHost* m_host; @@ -125,19 +130,24 @@ class PeerCapability friend class PeerSession; public: - PeerCapability(PeerSession* _s): m_session(_s) {} + PeerCapability(PeerSession* _s, HostCapability* _h): m_session(_s), m_host(_h) {} virtual ~PeerCapability() {} /// Must return the capability name. static std::string name() { return ""; } PeerSession* session() const { return m_session; } + HostCapability* hostCapability() const { return m_host; } protected: virtual bool interpret(RLP const&) = 0; + void disable(std::string const& _problem); + private: PeerSession* m_session; + HostCapability* m_host; + bool m_enabled = true; }; } diff --git a/libethnet/PeerHost.cpp b/libethnet/PeerHost.cpp index d5ca05cc4..b5627719f 100644 --- a/libethnet/PeerHost.cpp +++ b/libethnet/PeerHost.cpp @@ -450,6 +450,16 @@ std::vector PeerHost::peers(bool _updatePing) const return ret; } +void PeerHost::process() +{ + for (auto const& i: m_capabilities) + if (!i->isInitialised()) + return false; + growPeers(); + prunePeers(); + m_ioService.poll(); +} + void PeerHost::pingAll() { Guard l(x_peers); diff --git a/libethnet/PeerHost.h b/libethnet/PeerHost.h index f44d202eb..00c5beb88 100644 --- a/libethnet/PeerHost.h +++ b/libethnet/PeerHost.h @@ -41,6 +41,10 @@ class RLPStream; class TransactionQueue; class BlockQueue; +/** + * @brief The PeerHost class + * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. + */ class PeerHost { friend class PeerSession; @@ -72,7 +76,7 @@ public: /// Conduct I/O, polling, syncing, whatever. /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. /// This won't touch alter the blockchain. - void process() { if (isInitialised()) m_ioService.poll(); } + void process(); /// @returns true iff we have the a peer of the given id. bool havePeer(Public _id) const; @@ -107,7 +111,6 @@ protected: /// Called when the session has provided us with a new peer we can connect to. void noteNewPeers() {} - virtual bool isInitialised() const { return true; } void seal(bytes& _b); void populateAddresses(); void determinePublic(std::string const& _publicAddress, bool _upnp); @@ -133,8 +136,7 @@ protected: mutable std::mutex x_peers; mutable std::map> m_peers; // mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method. - mutable std::recursive_mutex m_incomingLock; - std::map> m_incomingPeers; + std::map> m_incomingPeers; // TODO: does this need a lock? std::vector m_freePeers; std::chrono::steady_clock::time_point m_lastPeersRequest; diff --git a/libethnet/PeerSession.cpp b/libethnet/PeerSession.cpp index c2b8ba757..dfa5d6195 100644 --- a/libethnet/PeerSession.cpp +++ b/libethnet/PeerSession.cpp @@ -180,7 +180,7 @@ bool PeerSession::interpret(RLP const& _r) break; default: for (auto const& i: m_capabilities) - if (i->interpret(_r)) + if (i->m_enabled && i->interpret(_r)) return true; return false; } diff --git a/libethnet/PeerSession.h b/libethnet/PeerSession.h index f51511e04..5919bf11d 100644 --- a/libethnet/PeerSession.h +++ b/libethnet/PeerSession.h @@ -54,7 +54,6 @@ public: bi::tcp::endpoint endpoint() const; ///< for other peers to connect to. -protected: static RLPStream& prep(RLPStream& _s); void sealAndSend(RLPStream& _s); void sendDestroy(bytes& _msg);