Browse Source

message signing and verification. shutdown io/thread before dealloc in nodetable/testnodetables.

cl-refactor
subtly 10 years ago
parent
commit
4216162fc1
  1. 65
      libp2p/NodeTable.cpp
  2. 33
      libp2p/NodeTable.h
  3. 21
      libp2p/UDP.cpp
  4. 38
      libp2p/UDP.h
  5. 34
      test/net.cpp

65
libp2p/NodeTable.cpp

@ -45,7 +45,7 @@ NodeTable::~NodeTable()
m_bucketRefreshTimer.cancel(); m_bucketRefreshTimer.cancel();
m_socketPtr->disconnect(); m_socketPtr->disconnect();
} }
void NodeTable::join() void NodeTable::join()
{ {
doFindNode(m_node.id); doFindNode(m_node.id);
@ -60,6 +60,16 @@ std::list<Address> NodeTable::nodes() const
return std::move(nodes); return std::move(nodes);
} }
list<NodeTable::NodeEntry> NodeTable::state() const
{
list<NodeEntry> ret;
Guard l(x_state);
for (auto s: m_state)
for (auto n: s.nodes)
ret.push_back(*n.lock());
return move(ret);
}
NodeTable::NodeEntry NodeTable::operator[](Address _id) NodeTable::NodeEntry NodeTable::operator[](Address _id)
{ {
Guard l(x_nodes); Guard l(x_nodes);
@ -115,17 +125,16 @@ void NodeTable::doFindNode(Address _node, unsigned _round, std::shared_ptr<std::
std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Address _target) std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Address _target)
{ {
// send s_alpha FindNode packets to nodes we know, closest to target // send s_alpha FindNode packets to nodes we know, closest to target
static unsigned lastBin = s_bins - 1;
unsigned head = dist(m_node.id, _target); unsigned head = dist(m_node.id, _target);
unsigned tail = (head - 1) % s_bins; unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins;
// todo: optimize with tree
std::map<unsigned, std::list<std::shared_ptr<NodeEntry>>> found; std::map<unsigned, std::list<std::shared_ptr<NodeEntry>>> found;
unsigned count = 0; unsigned count = 0;
// if d is 0, then we roll look forward, if last, we reverse, else, spread from d // if d is 0, then we roll look forward, if last, we reverse, else, spread from d
#pragma warning TODO: This should probably be s_bins instead of s_bits. if (head > 1 && tail != lastBin)
if (head != 0 && tail != s_bits) while (head != tail && head < s_bins && count < s_bucketSize)
while (head != tail && count < s_bucketSize)
{ {
Guard l(x_state); Guard l(x_state);
for (auto& n: m_state[head].nodes) for (auto& n: m_state[head].nodes)
@ -137,7 +146,7 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
break; break;
} }
if (count < s_bucketSize && head) if (count < s_bucketSize && tail)
for (auto& n: m_state[tail].nodes) for (auto& n: m_state[tail].nodes)
if (auto p = n.lock()) if (auto p = n.lock())
{ {
@ -146,11 +155,13 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
else else
break; break;
} }
head++; head++;
tail = (tail - 1) % s_bins; if (tail)
tail--;
} }
else if (head == 0) else if (head < 2)
while (head < s_bucketSize && count < s_bucketSize) while (head < s_bins && count < s_bucketSize)
{ {
Guard l(x_state); Guard l(x_state);
for (auto& n: m_state[head].nodes) for (auto& n: m_state[head].nodes)
@ -161,9 +172,9 @@ std::vector<std::shared_ptr<NodeTable::NodeEntry>> NodeTable::findNearest(Addres
else else
break; break;
} }
head--; head++;
} }
else if (tail == s_bins) else
while (tail > 0 && count < s_bucketSize) while (tail > 0 && count < s_bucketSize)
{ {
Guard l(x_state); Guard l(x_state);
@ -221,8 +232,8 @@ void NodeTable::noteNode(Public _pubk, bi::udp::endpoint _endpoint)
auto n = m_nodes.find(id); auto n = m_nodes.find(id);
if (n == m_nodes.end()) if (n == m_nodes.end())
{ {
m_nodes[id] = std::shared_ptr<NodeEntry>(new NodeEntry(m_node, id, _pubk, _endpoint)); node.reset(new NodeEntry(m_node, id, _pubk, _endpoint));
node = m_nodes[id]; m_nodes[id] = node;
} }
else else
node = n->second; node = n->second;
@ -235,12 +246,11 @@ void NodeTable::noteNode(std::shared_ptr<NodeEntry> _n)
{ {
std::shared_ptr<NodeEntry> contested; std::shared_ptr<NodeEntry> contested;
{ {
NodeBucket s = bucket(_n.get()); NodeBucket& s = bucket(_n.get());
Guard l(x_state); Guard l(x_state);
s.nodes.remove_if([&_n](std::weak_ptr<NodeEntry> n) s.nodes.remove_if([&_n](std::weak_ptr<NodeEntry> n)
{ {
auto p = n.lock(); if (n.lock() == _n)
if (!p || p == _n)
return true; return true;
return false; return false;
}); });
@ -264,7 +274,7 @@ void NodeTable::noteNode(std::shared_ptr<NodeEntry> _n)
void NodeTable::dropNode(std::shared_ptr<NodeEntry> _n) void NodeTable::dropNode(std::shared_ptr<NodeEntry> _n)
{ {
NodeBucket s = bucket(_n.get()); NodeBucket &s = bucket(_n.get());
{ {
Guard l(x_state); Guard l(x_state);
s.nodes.remove_if([&_n](std::weak_ptr<NodeEntry> n) { return n.lock() == _n; }); s.nodes.remove_if([&_n](std::weak_ptr<NodeEntry> n) { return n.lock() == _n; });
@ -273,9 +283,9 @@ void NodeTable::dropNode(std::shared_ptr<NodeEntry> _n)
m_nodes.erase(_n->id); m_nodes.erase(_n->id);
} }
NodeTable::NodeBucket const& NodeTable::bucket(NodeEntry* _n) const NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n)
{ {
return m_state[_n->distance]; return m_state[_n->distance - 1];
} }
void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet)
@ -286,12 +296,19 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
return; return;
} }
/// 3 items is PingNode, 2 items w/no lists is FindNode, 2 items w/first item as list is Neighbors, 1 item is Pong // 3 items is PingNode, 2 items w/no lists is FindNode, 2 items w/first item as list is Neighbors, 1 item is Pong
bytesConstRef rlpBytes(_packet.cropped(65, _packet.size() - 65)); bytesConstRef rlpBytes(_packet.cropped(65, _packet.size() - 65));
RLP rlp(rlpBytes); RLP rlp(rlpBytes);
unsigned itemCount = rlp.itemCount(); unsigned itemCount = rlp.itemCount();
// bytesConstRef sig(_packet.cropped(0, 65)); // verify signature (deferred) bytesConstRef sigBytes(_packet.cropped(0, 65));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(rlpBytes)));
if (!nodeid)
{
clog(NodeTableMessageSummary) << "Invalid Message Signature from " << _from.address().to_string() << ":" << _from.port();
return;
}
noteNode(nodeid, _from);
try { try {
switch (itemCount) { switch (itemCount) {
@ -300,8 +317,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port(); clog(NodeTableMessageSummary) << "Received Pong from " << _from.address().to_string() << ":" << _from.port();
Pong in = Pong::fromBytesConstRef(_from, rlpBytes); Pong in = Pong::fromBytesConstRef(_from, rlpBytes);
// whenever a pong is received, first check if it's in m_evictions, if so, remove it // whenever a pong is received, first check if it's in m_evictions
// otherwise check if we're expecting a pong. if we weren't, blacklist IP for 300 seconds
break; break;
} }
@ -329,7 +345,6 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
case 3: case 3:
{ {
clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port(); clog(NodeTableMessageSummary) << "Received PingNode from " << _from.address().to_string() << ":" << _from.port();
// todo: if we know the node, reply, otherwise ignore.
PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes); PingNode in = PingNode::fromBytesConstRef(_from, rlpBytes);
Pong p(_from); Pong p(_from);

33
libp2p/NodeTable.h

@ -38,19 +38,20 @@ namespace p2p
* shared_ptr replacement instead of mutating values. * shared_ptr replacement instead of mutating values.
* *
* [Interface] * [Interface]
* @todo constructor support for m_node, m_secret * @todo restore nodes: affects refreshbuckets
* @todo don't try to evict node if node isRequired. (support for makeRequired) * @todo TCP endpoints
* @todo exclude bucket from refresh if we have node as peer (support for makeRequired) * @todo makeRequired: don't try to evict node if node isRequired.
* @todo restore nodes * @todo makeRequired: exclude bucket from refresh if we have node as peer.
* @todo std::shared_ptr<PingNode> m_cachedPingPacket; * @todo std::shared_ptr<PingNode> m_cachedPingPacket;
* @todo std::shared_ptr<FindNeighbors> m_cachedFindSelfPacket; * @todo std::shared_ptr<FindNeighbors> m_cachedFindSelfPacket;
* *
* [Networking] * [Networking]
* @todo use eth/upnp/natpmp/stun/ice/etc for public-discovery * @todo TCP endpoints
* @todo eth/upnp/natpmp/stun/ice/etc for public-discovery
* @todo firewall * @todo firewall
* *
* [Protocol] * [Protocol]
* @todo ping newly added nodes for eviction * @todo post-eviction pong
* @todo optimize knowledge at opposite edges; eg, s_bitsPerStep lookups. (Can be done via pointers to NodeBucket) * @todo optimize knowledge at opposite edges; eg, s_bitsPerStep lookups. (Can be done via pointers to NodeBucket)
* @todo ^ s_bitsPerStep = 5; // Denoted by b in [Kademlia]. Bits by which address space is divided. * @todo ^ s_bitsPerStep = 5; // Denoted by b in [Kademlia]. Bits by which address space is divided.
* @todo optimize (use tree for state and/or custom compare for cache) * @todo optimize (use tree for state and/or custom compare for cache)
@ -130,10 +131,13 @@ public:
void join(); void join();
NodeEntry root() const { return NodeEntry(m_node, m_node.address(), m_node.publicKey(), m_node.endpoint.udp); }
std::list<Address> nodes() const; std::list<Address> nodes() const;
std::list<NodeEntry> state() const;
NodeEntry operator[](Address _id); NodeEntry operator[](Address _id);
protected: protected:
/// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to . /// Repeatedly sends s_alpha concurrent requests to nodes nearest to target, for nodes nearest to target, up to .
void doFindNode(Address _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>()); void doFindNode(Address _node, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried = std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());
@ -153,9 +157,9 @@ protected:
void dropNode(std::shared_ptr<NodeEntry> _n); void dropNode(std::shared_ptr<NodeEntry> _n);
NodeBucket const& bucket(NodeEntry* _n) const; NodeBucket& bucket(NodeEntry const* _n);
/// Network Events /// General Network Events
void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet); void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet);
@ -181,7 +185,7 @@ protected:
mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const. mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const.
std::map<Address, std::shared_ptr<NodeEntry>> m_nodes; ///< Address -> Node table (most common lookup path) std::map<Address, std::shared_ptr<NodeEntry>> m_nodes; ///< Address -> Node table (most common lookup path)
Mutex x_state; mutable Mutex x_state;
std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes. std::array<NodeBucket, s_bins> m_state; ///< State table of binned nodes.
Mutex x_evictions; Mutex x_evictions;
@ -194,6 +198,15 @@ protected:
boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions.
}; };
std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable)
{
_out << _nodeTable.root().address() << "\t" << "0\t" << _nodeTable.root().endpoint.udp.address() << ":" << _nodeTable.root().endpoint.udp.port() << std::endl;
auto s = _nodeTable.state();
for (auto n: s)
_out << n.address() << "\t" << n.distance << "\t" << n.endpoint.udp.address() << ":" << n.endpoint.udp.port() << std::endl;
return _out;
}
/** /**
* Ping packet: Check if node is alive. * Ping packet: Check if node is alive.
* PingNode is cached and regenerated after expiration - t, where t is timeout. * PingNode is cached and regenerated after expiration - t, where t is timeout.

21
libp2p/UDP.cpp

@ -22,3 +22,24 @@
#include "UDP.h" #include "UDP.h"
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
//template <class T>
h256 RLPXDatagramFace::sign(Secret const& _k)
{
RLPStream packet;
streamRLP(packet);
bytes b(packet.out());
h256 h(dev::sha3(b));
Signature sig = dev::sign(_k, h);
data.resize(b.size() + Signature::size);
sig.ref().copyTo(&data);
memcpy(data.data() + sizeof(Signature), b.data(), b.size());
return std::move(h);
};
//template <class T>
Public RLPXDatagramFace::authenticate(bytesConstRef _sig, bytesConstRef _rlp)
{
Signature const& sig = *(Signature const*)_sig.data();
return std::move(dev::recover(sig, sha3(_rlp)));
};

38
libp2p/UDP.h

@ -58,39 +58,28 @@ protected:
* @brief RLPX Datagram which can be signed. * @brief RLPX Datagram which can be signed.
* @todo compact templates * @todo compact templates
* @todo make data private/functional (see UDPDatagram) * @todo make data private/functional (see UDPDatagram)
* @todo valid=true/false (based on signature)
*/ */
template <class T> //template <class T>
struct RLPXDatagram: public UDPDatagram struct RLPXDatagramFace: public UDPDatagram
{ {
static T fromBytesConstRef(bi::udp::endpoint const& _ep, bytesConstRef _bytes) { T t(_ep); t.interpretRLP(_bytes); return std::move(t); } // static T fromBytesConstRef(bi::udp::endpoint const& _ep, bytesConstRef _bytes) { T t(_ep); t.interpretRLP(_bytes); return std::move(t); }
static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); } static uint64_t futureFromEpoch(std::chrono::milliseconds _ms) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _ms).time_since_epoch()).count(); }
static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); } static uint64_t futureFromEpoch(std::chrono::seconds _sec) { return std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count(); }
static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
RLPXDatagram(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {} RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
virtual h256 sign(Secret const& _from); virtual h256 sign(Secret const& _from);
virtual void streamRLP(RLPStream&) const =0; virtual void streamRLP(RLPStream&) const =0;
virtual void interpretRLP(bytesConstRef _bytes) =0; virtual void interpretRLP(bytesConstRef _bytes) =0;
protected:
Signature signature;
}; };
template <class T> template <class T>
h256 RLPXDatagram<T>::sign(Secret const& _k) struct RLPXDatagram: public RLPXDatagramFace
{ {
RLPStream packet; static T fromBytesConstRef(bi::udp::endpoint const& _ep, bytesConstRef _bytes) { T t(_ep); t.interpretRLP(_bytes); return std::move(t); }
streamRLP(packet); using RLPXDatagramFace::RLPXDatagramFace;
bytes b(packet.out()); };
h256 h(dev::sha3(b));
Signature sig = dev::sign(_k, h);
data.resize(b.size() + Signature::size);
sig.ref().copyTo(&data);
memcpy(data.data() + sizeof(Signature), b.data(), b.size());
return std::move(h);
}
/** /**
* @brief Interface which UDPSocket will implement. * @brief Interface which UDPSocket will implement.
@ -198,6 +187,9 @@ bool UDPSocket<Handler,MaxDatagramSize>::send(UDPDatagram const& _datagram)
template <typename Handler, unsigned MaxDatagramSize> template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler,MaxDatagramSize>::doRead() void UDPSocket<Handler,MaxDatagramSize>::doRead()
{ {
if (m_closed)
return;
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this()); auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_receive_from(boost::asio::buffer(recvData), recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len) m_socket.async_receive_from(boost::asio::buffer(recvData), recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
{ {
@ -206,14 +198,16 @@ void UDPSocket<Handler,MaxDatagramSize>::doRead()
assert(_len); assert(_len);
m_host.onReceived(this, recvEndpoint, bytesConstRef(recvData.data(), _len)); m_host.onReceived(this, recvEndpoint, bytesConstRef(recvData.data(), _len));
if (!m_closed) doRead();
doRead();
}); });
} }
template <typename Handler, unsigned MaxDatagramSize> template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler,MaxDatagramSize>::doWrite() void UDPSocket<Handler,MaxDatagramSize>::doWrite()
{ {
if (m_closed)
return;
const UDPDatagram& datagram = sendQ[0]; const UDPDatagram& datagram = sendQ[0];
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this()); auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.endpoint(), [this, self](boost::system::error_code _ec, std::size_t) m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.endpoint(), [this, self](boost::system::error_code _ec, std::size_t)

34
test/net.cpp

@ -42,6 +42,7 @@ public:
virtual ~TestHost() { m_io.stop(); stopWorking(); } virtual ~TestHost() { m_io.stop(); stopWorking(); }
void start() { startWorking(); } void start() { startWorking(); }
void doWork() { m_io.run(); } void doWork() { m_io.run(); }
void doneWorking() { m_io.reset(); m_io.poll(); m_io.reset(); }
protected: protected:
ba::io_service m_io; ba::io_service m_io;
@ -52,18 +53,20 @@ struct TestNodeTable: public NodeTable
/// Constructor /// Constructor
using NodeTable::NodeTable; using NodeTable::NodeTable;
void setup(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes) void pingAll(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes)
{ {
/// Phase 2 test: pre-populate *expected* ping-responses, send pings
bi::address ourIp = bi::address::from_string("127.0.0.1"); bi::address ourIp = bi::address::from_string("127.0.0.1");
uint16_t ourPort = 30300;
bi::udp::endpoint ourEndpoint(ourIp, ourPort);
for (auto& n: _testNodes) for (auto& n: _testNodes)
ping(bi::udp::endpoint(ourIp, n.second)); ping(bi::udp::endpoint(ourIp, n.second));
} }
void populate(std::vector<std::pair<KeyPair,unsigned>> const& _testNodes)
{
bi::address ourIp = bi::address::from_string("127.0.0.1");
for (auto& n: _testNodes)
noteNode(n.first.pub(), bi::udp::endpoint(ourIp, n.second));
}
void reset() void reset()
{ {
Guard l(x_state); Guard l(x_state);
@ -77,8 +80,9 @@ struct TestNodeTable: public NodeTable
struct TestNodeTableHost: public TestHost struct TestNodeTableHost: public TestHost
{ {
TestNodeTableHost(): m_alias(KeyPair::create()), nodeTable(new TestNodeTable(m_io, m_alias)) {}; TestNodeTableHost(): m_alias(KeyPair::create()), nodeTable(new TestNodeTable(m_io, m_alias)) {};
~TestNodeTableHost() { m_io.stop(); stopWorking(); }
void generateTestNodes(int _count = 10) void generateTestNodes(int _count = 30)
{ {
asserts(_count < 1000); asserts(_count < 1000);
static uint16_t s_basePort = 30500; static uint16_t s_basePort = 30500;
@ -96,7 +100,16 @@ struct TestNodeTableHost: public TestHost
void setup() void setup()
{ {
generateTestNodes(); generateTestNodes();
nodeTable->setup(m_testNodes); }
void pingAll()
{
nodeTable->pingAll(m_testNodes);
}
void populate()
{
nodeTable->populate(m_testNodes);
} }
KeyPair m_alias; KeyPair m_alias;
@ -123,7 +136,10 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.start(); node.start();
node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for node.nodeTable->join(); // ideally, joining with empty node table logs warning we can check for
node.setup(); node.setup();
sleep(1); node.pingAll();
this_thread::sleep_for(chrono::milliseconds(500));
cout << "NodeTable:\n" << *node.nodeTable.get() << endl;
} }
BOOST_AUTO_TEST_CASE(test_txrx_one) BOOST_AUTO_TEST_CASE(test_txrx_one)

Loading…
Cancel
Save