Browse Source

udp != tcp. history-commit.

cl-refactor
subtly 10 years ago
parent
commit
71bf6e7ede
  1. 30
      libp2p/Host.cpp
  2. 6
      libp2p/Host.h
  3. 8
      libp2p/Network.cpp
  4. 102
      libp2p/Network.h
  5. 2
      libp2p/Session.cpp
  6. 4
      libp2p/Session.h
  7. 33
      test/kademlia.cpp
  8. 0
      test/net.cpp

30
libp2p/Host.cpp

@ -43,7 +43,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_netPrefs(_n),
m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2),
m_acceptorV4(m_ioService),
m_tcp4Acceptor(m_ioService),
m_key(KeyPair::create())
{
for (auto address: m_ifAddresses)
@ -95,9 +95,9 @@ void Host::doneWorking()
m_ioService.reset();
// shutdown acceptor
m_acceptorV4.cancel();
if (m_acceptorV4.is_open())
m_acceptorV4.close();
m_tcp4Acceptor.cancel();
if (m_tcp4Acceptor.is_open())
m_tcp4Acceptor.close();
// There maybe an incoming connection which started but hasn't finished.
// Wait for acceptor to end itself instead of assuming it's complete.
@ -280,7 +280,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
{
if (!m_peerAddresses.count(reqpublicaddr))
m_peerAddresses.insert(reqpublicaddr);
m_public = reqpublic;
m_tcpPublic = reqpublic;
return;
}
@ -288,7 +288,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
for (auto addr: m_peerAddresses)
if (addr.is_v4() && !isPrivateAddress(addr))
{
m_public = bi::tcp::endpoint(*m_peerAddresses.begin(), m_listenPort);
m_tcpPublic = bi::tcp::endpoint(*m_peerAddresses.begin(), m_listenPort);
return;
}
@ -301,7 +301,7 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
{
if (!m_peerAddresses.count(upnpep.address()))
m_peerAddresses.insert(upnpep.address());
m_public = upnpep;
m_tcpPublic = upnpep;
return;
}
}
@ -312,12 +312,12 @@ void Host::determinePublic(string const& _publicAddress, bool _upnp)
for (auto addr: m_peerAddresses)
if (addr.is_v4() && isPrivateAddress(addr))
{
m_public = bi::tcp::endpoint(addr, m_listenPort);
m_tcpPublic = bi::tcp::endpoint(addr, m_listenPort);
return;
}
// otherwise address is unspecified
m_public = bi::tcp::endpoint(bi::address(), m_listenPort);
m_tcpPublic = bi::tcp::endpoint(bi::address(), m_listenPort);
}
void Host::runAcceptor()
@ -326,10 +326,10 @@ void Host::runAcceptor()
if (m_run && !m_accepting)
{
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_public << ")";
clog(NetConnect) << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
m_accepting = true;
m_socket.reset(new bi::tcp::socket(m_ioService));
m_acceptorV4.async_accept(*m_socket, [=](boost::system::error_code ec)
m_tcp4Acceptor.async_accept(*m_socket, [=](boost::system::error_code ec)
{
bool success = false;
if (!ec)
@ -656,7 +656,7 @@ void Host::startedWorking()
}
// try to open acceptor (todo: ipv6)
m_listenPort = Network::listen4(m_acceptorV4, m_netPrefs.listenPort);
m_listenPort = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs.listenPort);
// start capability threads
for (auto const& h: m_capabilities)
@ -674,8 +674,8 @@ void Host::startedWorking()
// if m_public address is valid then add us to node list
// todo: abstract empty() and emplace logic
if (!m_public.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
noteNode(id(), m_public, Origin::Perfect, false);
if (!m_tcpPublic.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
noteNode(id(), m_tcpPublic, Origin::Perfect, false);
clog(NetNote) << "Id:" << id().abridged();
@ -739,7 +739,7 @@ void Host::restoreNodes(bytesConstRef _b)
{
auto oldId = id();
m_key = KeyPair(r[1].toHash<Secret>());
noteNode(id(), m_public, Origin::Perfect, false, oldId);
noteNode(id(), m_tcpPublic, Origin::Perfect, false, oldId);
for (auto i: r[2])
{

6
libp2p/Host.h

@ -152,7 +152,7 @@ public:
void pingAll();
/// Get the port we're listening on currently.
unsigned short listenPort() const { return m_public.port(); }
unsigned short listenPort() const { return m_tcpPublic.port(); }
/// Serialise the set of known peers.
bytes saveNodes() const;
@ -219,7 +219,7 @@ private:
int m_listenPort = -1; ///< What port are we listening on. -1 means binding failed or acceptor hasn't been initialized.
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_acceptorV4; ///< Listening acceptor.
bi::tcp::acceptor m_tcp4Acceptor; ///< Listening acceptor.
std::unique_ptr<bi::tcp::socket> m_socket; ///< Listening socket.
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
@ -229,7 +229,7 @@ private:
std::set<Node*> m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptr<Node>const&).
Mutex x_pendingNodeConns;
bi::tcp::endpoint m_public; ///< Our public listening endpoint.
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.
bool m_hadNewNodes = false;

8
libp2p/Network.cpp

@ -38,6 +38,12 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
template <class T>
Socket<T>::Socket(SocketEventFace* _seface): m_eventDelegate(_seface), m_socket(m_eventDelegate->ioService()) {}
template <class T>
Socket<T>::Socket(SocketEventFace* _seface, endpointType _endpoint): m_eventDelegate(_seface), m_socket(m_eventDelegate->ioService(), _endpoint) {}
std::vector<bi::address> Network::getInterfaceAddresses()
{
std::vector<bi::address> addresses;
@ -111,7 +117,7 @@ std::vector<bi::address> Network::getInterfaceAddresses()
return std::move(addresses);
}
int Network::listen4(bi::tcp::acceptor& _acceptor, unsigned short _listenPort)
int Network::tcp4Listen(bi::tcp::acceptor& _acceptor, unsigned short _listenPort)
{
int retport = -1;
for (unsigned i = 0; i < 2; ++i)

102
libp2p/Network.h

@ -22,13 +22,17 @@
#pragma once
#include <memory>
#include <vector>
#include <deque>
#include <libdevcore/RLP.h>
#include <libdevcore/Guards.h>
#include "Common.h"
namespace ba = boost::asio;
namespace bi = ba::ip;
namespace dev
{
namespace p2p
{
@ -42,6 +46,98 @@ struct NetworkPreferences
bool localNetworking = false;
};
struct Packet
{
bytes payload() const { return s.out(); }
bool required = false;
RLPStream s;
};
class SocketFace
{
virtual void send(Packet const& _msg) = 0;
};
class SocketEventFace;
/**
* @brief Generic Socket Interface
* Owners of sockets must outlive the socket.
* Boost ASIO uses lowercase template for udp/tcp, which is adopted here.
*/
template <class T>
class Socket: SocketFace, public std::enable_shared_from_this<Socket<T>>
{
public:
using socketType = typename T::socket;
using endpointType = typename T::endpoint;
Socket(SocketEventFace* _seface);
Socket(SocketEventFace* _seface, endpointType _endpoint);
protected:
void send(Packet const& _msg)
{
if (!m_started)
return;
Guard l(x_sendQ);
sendQ.push_back(_msg.payload());
if (sendQ.size() == 1 && !m_stopped)
doWrite();
}
void doWrite()
{
const bytes& bytes = sendQ[0];
auto self(Socket<T>::shared_from_this());
// boost::asio::async_write(m_socket, boost::asio::buffer(bytes), [this, self](boost::system::error_code _ec, std::size_t /*length*/)
// {
// if (_ec)
// return stopWithError(_ec);
// else
// {
// Guard l(x_sendQ);
// sendQ.pop_front();
// if (sendQ.empty())
// return;
// }
// doWrite();
// });
}
void stopWithError(boost::system::error_code _ec);
std::atomic<bool> m_stopped; ///< Set when connection is stopping or stopped. Handshake cannot occur unless m_stopped is true.
std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start() and shutdown(bool).
SocketEventFace* m_eventDelegate = nullptr;
Mutex x_sendQ;
std::deque<bytes> sendQ;
bytes recvBuffer;
size_t recvdBytes = 0;
socketType m_socket;
Mutex x_socketError; ///< Mutex for error which can occur from host or IO thread.
boost::system::error_code socketError; ///< Set when shut down due to error.
};
class SocketEventFace
{
public:
virtual ba::io_service& ioService() = 0;
virtual void onStopped(SocketFace*) = 0;
virtual void onReceive(SocketFace*, Packet&) = 0;
};
struct UDPSocket: public Socket<bi::udp>
{
UDPSocket(ba::io_service& _io, unsigned _port): Socket<bi::udp>(nullptr, bi::udp::endpoint(bi::udp::v4(), _port)) {}
~UDPSocket() { boost::system::error_code ec; m_socket.shutdown(bi::udp::socket::shutdown_both, ec); m_socket.close(); }
// bi::udp::socket m_socket;
};
/**
* @brief Network Class
* Static network operations and interface(s).
@ -53,8 +149,8 @@ public:
static std::vector<bi::address> getInterfaceAddresses();
/// Try to bind and listen on _listenPort, else attempt net-allocated port.
static int listen4(bi::tcp::acceptor& _acceptor, unsigned short _listenPort);
static int tcp4Listen(bi::tcp::acceptor& _acceptor, unsigned short _listenPort);
/// Return public endpoint of upnp interface. If successful o_upnpifaddr will be a private interface address and endpoint will contain public address and port.
static bi::tcp::endpoint traverseNAT(std::vector<bi::address> const& _ifAddresses, unsigned short _listenPort, bi::address& o_upnpifaddr);
};

2
libp2p/Session.cpp

@ -522,7 +522,7 @@ void Session::start()
<< m_server->protocolVersion()
<< m_server->m_clientVersion
<< m_server->caps()
<< m_server->m_public.port()
<< m_server->m_tcpPublic.port()
<< m_server->id();
sealAndSend(s);
ping();

4
libp2p/Session.h

@ -107,8 +107,8 @@ private:
mutable bi::tcp::socket m_socket; ///< Socket for the peer's connection. Mutable to ask for native_handle().
Mutex x_writeQueue; ///< Mutex for the write queue.
std::deque<bytes> m_writeQueue; ///< The write queue.
std::array<byte, 65536> m_data; ///< Data buffer for the write queue.
bytes m_incoming; ///< The incoming read queue of bytes.
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data.
bytes m_incoming; ///< Read buffer for ingress bytes.
PeerInfo m_info; ///< Dynamic information about this peer.

33
test/kademlia.cpp

@ -0,0 +1,33 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file kademlia.cpp
* @author Alex Leverington <nessence@gmail.com>
* @date 2014
* Basic networking tests
*/
#include <boost/test/unit_test.hpp>
#include <libp2p/Common.h>
using namespace std;
using namespace dev;
using namespace dev::p2p;
BOOST_AUTO_TEST_CASE(host_listen_udp4)
{
}

0
test/network.cpp → test/net.cpp

Loading…
Cancel
Save