Browse Source

prep for framing. remove unused raw network send() methods from capabilities. bump protocol version. back out magic sequence packet prefix.

cl-refactor
subtly 10 years ago
parent
commit
7386ad2606
  1. 10
      libp2p/Capability.cpp
  2. 3
      libp2p/Capability.h
  3. 16
      libp2p/Host.cpp
  4. 69
      libp2p/Session.cpp
  5. 5
      libp2p/Session.h

10
libp2p/Capability.cpp

@ -53,16 +53,6 @@ void Capability::sealAndSend(RLPStream& _s)
m_session->sealAndSend(_s); m_session->sealAndSend(_s);
} }
void Capability::send(bytesConstRef _msg)
{
m_session->send(_msg);
}
void Capability::send(bytes&& _msg)
{
m_session->send(move(_msg));
}
void Capability::addRating(unsigned _r) void Capability::addRating(unsigned _r)
{ {
m_session->addRating(_r); m_session->addRating(_r);

3
libp2p/Capability.h

@ -52,9 +52,6 @@ protected:
RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0); RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
void send(bytes&& _msg);
void send(bytesConstRef _msg);
void addRating(unsigned _r); void addRating(unsigned _r);
private: private:

16
libp2p/Host.cpp

@ -154,7 +154,7 @@ void Host::doneWorking()
unsigned Host::protocolVersion() const unsigned Host::protocolVersion() const
{ {
return 3; return 4;
} }
bool Host::startPeerSession(Public const& _id, RLP const& _rlp, bi::tcp::socket *_socket) bool Host::startPeerSession(Public const& _id, RLP const& _rlp, bi::tcp::socket *_socket)
@ -273,15 +273,11 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
void Host::seal(bytes& _b) void Host::seal(bytes& _b)
{ {
_b[0] = 0x22; uint32_t len = (uint32_t)_b.size() - 4;
_b[1] = 0x40; _b[0] = (len >> 24) & 0xff;
_b[2] = 0x08; _b[1] = (len >> 16) & 0xff;
_b[3] = 0x91; _b[2] = (len >> 8) & 0xff;
uint32_t len = (uint32_t)_b.size() - 8; _b[3] = len & 0xff;
_b[4] = (len >> 24) & 0xff;
_b[5] = (len >> 16) & 0xff;
_b[6] = (len >> 8) & 0xff;
_b[7] = len & 0xff;
} }
void Host::determinePublic(string const& _publicAddress, bool _upnp) void Host::determinePublic(string const& _publicAddress, bool _upnp)

69
libp2p/Session.cpp

@ -281,7 +281,7 @@ RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args)
RLPStream& Session::prep(RLPStream& _s) RLPStream& Session::prep(RLPStream& _s)
{ {
return _s.appendRaw(bytes(8, 0)); return _s.appendRaw(bytes(4, 0));
} }
void Session::sealAndSend(RLPStream& _s) void Session::sealAndSend(RLPStream& _s)
@ -294,14 +294,12 @@ void Session::sealAndSend(RLPStream& _s)
bool Session::checkPacket(bytesConstRef _msg) bool Session::checkPacket(bytesConstRef _msg)
{ {
if (_msg.size() < 8) if (_msg.size() < 5)
return false; return false;
if (!(_msg[0] == 0x22 && _msg[1] == 0x40 && _msg[2] == 0x08 && _msg[3] == 0x91)) uint32_t len = ((_msg[0] * 256 + _msg[1]) * 256 + _msg[2]) * 256 + _msg[3];
if (_msg.size() != len + 4)
return false; return false;
uint32_t len = ((_msg[4] * 256 + _msg[5]) * 256 + _msg[6]) * 256 + _msg[7]; RLP r(_msg.cropped(4));
if (_msg.size() != len + 8)
return false;
RLP r(_msg.cropped(8));
if (r.actualSize() != len) if (r.actualSize() != len)
return false; return false;
return true; return true;
@ -314,7 +312,7 @@ void Session::send(bytesConstRef _msg)
void Session::send(bytes&& _msg) void Session::send(bytes&& _msg)
{ {
clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(8)); clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(4));
if (!checkPacket(bytesConstRef(&_msg))) if (!checkPacket(bytesConstRef(&_msg)))
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!"; clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
@ -410,68 +408,53 @@ void Session::start()
void Session::doRead() void Session::doRead()
{ {
// ignore packets received while waiting to disconnect // ignore packets received while waiting to disconnect.
if (m_dropped) if (m_dropped)
return; return;
auto self(shared_from_this()); auto self(shared_from_this());
m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length) m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length)
{ {
// If error is end of file, ignore
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{ {
// got here with length of 1241...
clogS(NetWarn) << "Error reading: " << ec.message(); clogS(NetWarn) << "Error reading: " << ec.message();
drop(TCPError); drop(TCPError);
} }
else if (ec && length == 0) else if (ec && length == 0)
{
return; return;
}
else else
{ {
try try
{ {
m_incoming.resize(m_incoming.size() + length); m_incoming.resize(m_incoming.size() + length);
memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length); memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length);
while (m_incoming.size() > 8)
// 4 bytes for length header
const uint32_t c_hLen = 4;
while (m_incoming.size() > c_hLen)
{ {
if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) // break if data recvd is less than expected size of packet.
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data(), c_hLen));
uint32_t tlen = c_hLen + len;
if (m_incoming.size() < tlen)
break;
bytesConstRef frame(m_incoming.data(), tlen);
bytesConstRef packet = frame.cropped(c_hLen);
if (!checkPacket(frame))
{ {
clogS(NetWarn) << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4)); cerr << "Received " << packet.size() << ": " << toHex(packet) << endl;
clogS(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol); disconnect(BadProtocol);
return; return;
} }
else else
{ {
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4)); RLP r(packet);
uint32_t tlen = len + 8; if (!interpret(r))
if (m_incoming.size() < tlen) clogS(NetWarn) << "Couldn't interpret packet." << RLP(r);
break;
// enough has come in.
auto data = bytesConstRef(m_incoming.data(), tlen);
if (!checkPacket(data))
{
cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl;
clogS(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}
else
{
RLP r(data.cropped(8));
if (!interpret(r))
{
// error - bad protocol
clogS(NetWarn) << "Couldn't interpret packet." << RLP(r);
// Just wasting our bandwidth - perhaps reduce rating?
//return;
}
}
memmove(m_incoming.data(), m_incoming.data() + tlen, m_incoming.size() - tlen);
m_incoming.resize(m_incoming.size() - tlen);
} }
memmove(m_incoming.data(), m_incoming.data() + tlen, m_incoming.size() - tlen);
m_incoming.resize(m_incoming.size() - tlen);
} }
doRead(); doRead();
} }

5
libp2p/Session.h

@ -71,8 +71,6 @@ public:
static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0); static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
static RLPStream& prep(RLPStream& _s); static RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
void send(bytes&& _msg);
void send(bytesConstRef _msg);
int rating() const; int rating() const;
void addRating(unsigned _r); void addRating(unsigned _r);
@ -85,6 +83,9 @@ public:
void serviceNodesRequest(); void serviceNodesRequest();
private: private:
void send(bytes&& _msg);
void send(bytesConstRef _msg);
/// Drop the connection for the reason @a _r. /// Drop the connection for the reason @a _r.
void drop(DisconnectReason _r); void drop(DisconnectReason _r);

Loading…
Cancel
Save