Browse Source

send/receive messages (not yet interepreted)

cl-refactor
subtly 10 years ago
parent
commit
2a1ea35f7f
  1. 13
      libp2p/NodeTable.cpp
  2. 14
      libp2p/NodeTable.h
  3. 1
      libp2p/UDP.h
  4. 52
      test/net.cpp

13
libp2p/NodeTable.cpp

@ -24,9 +24,10 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
NodeTable::NodeTable(ba::io_service& _io, uint16_t _port): NodeTable::NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _listenPort):
m_node(Node(Address(), Public(), bi::udp::endpoint())), m_node(Node(_alias.address(), _alias.pub(), bi::udp::endpoint())),
m_socket(new nodeSocket(_io, *this, _port)), m_secret(_alias.sec()),
m_socket(new NodeSocket(_io, *this, _listenPort)),
m_socketPtr(m_socket.get()), m_socketPtr(m_socket.get()),
m_io(_io), m_io(_io),
m_bucketRefreshTimer(m_io), m_bucketRefreshTimer(m_io),
@ -281,7 +282,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
{ {
RLP rlp(_packet); RLP rlp(_packet);
clog(NodeTableNote) << "Received message from " << _from.address().to_string() << ":" << _from.port(); clog(NodeTableNote) << "Received X from " << _from.address().to_string() << ":" << _from.port();
// whenever a pong is received, first check if it's in m_evictions, if so, remove it // whenever a pong is received, first check if it's in m_evictions, if so, remove it
Guard l(x_evictions); Guard l(x_evictions);
@ -322,10 +323,10 @@ void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec) void NodeTable::doRefreshBuckets(boost::system::error_code const& _ec)
{ {
clog(NodeTableNote) << "refreshing buckets";
if (_ec) if (_ec)
return; return;
clog(NodeTableNote) << "refreshing buckets";
bool connected = m_socketPtr->isOpen(); bool connected = m_socketPtr->isOpen();
bool refreshed = false; bool refreshed = false;
if (connected) if (connected)

14
libp2p/NodeTable.h

@ -141,9 +141,9 @@ struct Neighbors: RLPXDatagram
*/ */
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable> class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{ {
using nodeSocket = UDPSocket<NodeTable, 1280>; using NodeSocket = UDPSocket<NodeTable, 1280>;
using timePoint = std::chrono::steady_clock::time_point; using TimePoint = std::chrono::steady_clock::time_point;
using EvictionTimeout = std::pair<std::pair<Address,timePoint>,Address>; using EvictionTimeout = std::pair<std::pair<Address,TimePoint>,Address>;
struct NodeDefaultEndpoint struct NodeDefaultEndpoint
{ {
@ -179,7 +179,7 @@ class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
struct NodeBucket struct NodeBucket
{ {
unsigned distance; unsigned distance;
timePoint modified; TimePoint modified;
std::list<std::weak_ptr<NodeEntry>> nodes; std::list<std::weak_ptr<NodeEntry>> nodes;
}; };
@ -206,7 +206,7 @@ public:
static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
NodeTable(ba::io_service& _io, uint16_t _port = s_defaultPort); NodeTable(ba::io_service& _io, KeyPair _alias, uint16_t _port = s_defaultPort);
~NodeTable(); ~NodeTable();
void join(); void join();
@ -268,8 +268,8 @@ protected:
Mutex x_evictions; Mutex x_evictions;
std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts. std::deque<EvictionTimeout> m_evictions; ///< Eviction timeouts.
std::shared_ptr<nodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. std::shared_ptr<NodeSocket> m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr.
nodeSocket* m_socketPtr; ///< Set to m_socket.get(). NodeSocket* m_socketPtr; ///< Set to m_socket.get().
ba::io_service& m_io; ///< Used by bucket refresh timer. ba::io_service& m_io; ///< Used by bucket refresh timer.
boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh.
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.

1
libp2p/UDP.h

@ -166,7 +166,6 @@ bool UDPSocket<Handler,MaxDatagramSize>::send(UDPDatagram const& _datagram)
Guard l(x_sendQ); Guard l(x_sendQ);
sendQ.push_back(_datagram); sendQ.push_back(_datagram);
clog(NoteChannel) << "qued datagram";
if (sendQ.size() == 1) if (sendQ.size() == 1)
doWrite(); doWrite();

52
test/net.cpp

@ -49,21 +49,10 @@ protected:
struct TestNodeTable: public NodeTable struct TestNodeTable: public NodeTable
{ {
void generateTestNodes(int _count = 10)
{
asserts(_count < 1000);
static uint16_t s_basePort = 30500;
m_testNodes.clear();
for (auto i = 0; i < _count; i++)
m_testNodes.push_back(make_pair(KeyPair::create(),s_basePort++));
}
std::vector<std::pair<KeyPair,unsigned>> m_testNodes; // keypair and port
/// Constructor /// Constructor
using NodeTable::NodeTable; using NodeTable::NodeTable;
void setup() void setup(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes)
{ {
/// Phase 1 test: populate with pings /// Phase 1 test: populate with pings
/// Phase 2 test: pre-populate *expected* ping-responses, send pings /// Phase 2 test: pre-populate *expected* ping-responses, send pings
@ -72,8 +61,7 @@ struct TestNodeTable: public NodeTable
uint16_t ourPort = 30300; uint16_t ourPort = 30300;
bi::udp::endpoint ourEndpoint(ourIp, ourPort); bi::udp::endpoint ourEndpoint(ourIp, ourPort);
generateTestNodes(); for (auto& n: _testNodes)
for (auto& n: m_testNodes)
ping(bi::udp::endpoint(ourIp, n.second)); ping(bi::udp::endpoint(ourIp, n.second));
// wait 1ms between each send // wait 1ms between each send
@ -94,8 +82,32 @@ struct TestNodeTable: public NodeTable
*/ */
struct TestNodeTableHost: public TestHost struct TestNodeTableHost: public TestHost
{ {
TestNodeTableHost(): nodeTable(new TestNodeTable(m_io)) {}; TestNodeTableHost(): m_alias(KeyPair::create()), nodeTable(new TestNodeTable(m_io, m_alias)) {};
void generateTestNodes(int _count = 10)
{
asserts(_count < 1000);
static uint16_t s_basePort = 30500;
m_testNodes.clear();
for (auto i = 0; i < _count; i++)
{
KeyPair k = KeyPair::create();
m_testNodes.push_back(make_pair(k,s_basePort+i));
testNodes.push_back(make_shared<TestNodeTable>(m_io,k,s_basePort+i));
}
}
std::vector<std::pair<KeyPair,unsigned>> m_testNodes; // keypair and port
void setup()
{
generateTestNodes();
nodeTable->setup(m_testNodes);
}
KeyPair m_alias;
shared_ptr<TestNodeTable> nodeTable; shared_ptr<TestNodeTable> nodeTable;
std::vector<shared_ptr<TestNodeTable>> testNodes;
}; };
class TestUDPSocket: UDPSocketEvents, public TestHost class TestUDPSocket: UDPSocketEvents, public TestHost
@ -113,11 +125,11 @@ public:
BOOST_AUTO_TEST_CASE(kademlia) BOOST_AUTO_TEST_CASE(kademlia)
{ {
// TestNodeTableHost node; TestNodeTableHost node;
// node.start(); node.start();
// node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for
// node.nodeTable->setup(); node.setup();
// sleep(1); sleep(1);
} }
BOOST_AUTO_TEST_CASE(test_txrx_one) BOOST_AUTO_TEST_CASE(test_txrx_one)

Loading…
Cancel
Save