Browse Source

First vaguely semi-functional version of whisper.

cl-refactor
Gav Wood 10 years ago
parent
commit
a7ce9a80f1
  1. 14
      exp/main.cpp
  2. 3
      libethcore/CMakeLists.txt
  3. 183
      libethcore/UPnP.cpp
  4. 53
      libethcore/UPnP.h
  5. 1
      libethereum/CMakeLists.txt
  6. 22
      libethereum/Client.cpp
  7. 1
      libethereum/CommonNet.h
  8. 1
      libethereum/EthereumHost.cpp
  9. 24
      libethereum/EthereumPeer.cpp
  10. 12
      libethereum/EthereumPeer.h
  11. 3
      libethnet/Common.h
  12. 2
      libethnet/PeerHost.cpp
  13. 2
      libethnet/PeerHost.h
  14. 3
      libethnet/PeerSession.h
  15. 1
      libwhisper/CMakeLists.txt
  16. 13
      libwhisper/Common.cpp
  17. 50
      libwhisper/Common.h
  18. 216
      libwhisper/WhisperPeer.cpp
  19. 200
      libwhisper/WhisperPeer.h

14
exp/main.cpp

@ -31,12 +31,14 @@
#include <libethential/CommonData.h>
#include <libethential/RLP.h>
#include <libethnet/All.h>
#include <libwhisper/WhisperPeer.h>
#if 0
#include <libevm/VM.h>
#include "BuildInfo.h"
#endif
using namespace std;
using namespace eth;
using namespace shh;
#if 0
#if 0
namespace qi = boost::spirit::qi;
@ -318,16 +320,22 @@ int main(int argc, char** argv)
}
PeerHost ph("Test", listenPort, "", false, true);
ph.registerCapability(new WhisperHost());
auto wh = ph.cap<WhisperHost>();
if (!remoteHost.empty())
ph.connect(remoteHost, remotePort);
/// Only interested in the packet if the lowest bit is 1
auto w = wh->installWatch(MessageFilter(std::vector<std::pair<bytes, bytes> >({{fromHex("0000000000000000000000000000000000000000000000000000000000000001"), fromHex("0000000000000000000000000000000000000000000000000000000000000001")}})));
for (int i = 0; ; ++i)
{
this_thread::sleep_for(chrono::milliseconds(100));
if (!(i % 100))
ph.pingAll();
this_thread::sleep_for(chrono::milliseconds(1000));
ph.process();
wh->sendRaw(h256(u256(i * i)).asBytes(), h256(u256(i)).asBytes(), 1000);
for (auto i: wh->checkWatch(w))
cnote << "New message:" << (u256)h256(wh->message(i).payload);
}
return 0;

3
libethcore/CMakeLists.txt

@ -18,9 +18,6 @@ include_directories(..)
target_link_libraries(${EXECUTABLE} ethential)
target_link_libraries(${EXECUTABLE} secp256k1)
target_link_libraries(${EXECUTABLE} gmp)
if(MINIUPNPC_LS)
target_link_libraries(${EXECUTABLE} ${MINIUPNPC_LS})
endif()
target_link_libraries(${EXECUTABLE} ${LEVELDB_LS})
target_link_libraries(${EXECUTABLE} ${CRYPTOPP_LS})

183
libethcore/UPnP.cpp

@ -1,183 +0,0 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file UPnP.cpp
* @authors:
* Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "UPnP.h"
#include <stdio.h>
#include <string.h>
#if ETH_MINIUPNPC
#include <miniupnpc/miniwget.h>
#include <miniupnpc/miniupnpc.h>
#include <miniupnpc/upnpcommands.h>
#endif
#include <libethential/Exceptions.h>
#include <libethential/Common.h>
#include <libethential/Log.h>
using namespace std;
using namespace eth;
UPnP::UPnP()
{
#if ETH_MINIUPNPC
m_urls.reset(new UPNPUrls);
m_data.reset(new IGDdatas);
m_ok = false;
struct UPNPDev* devlist;
struct UPNPDev* dev;
char* descXML;
int descXMLsize = 0;
int upnperror = 0;
memset(m_urls.get(), 0, sizeof(struct UPNPUrls));
memset(m_data.get(), 0, sizeof(struct IGDdatas));
devlist = upnpDiscover(2000, NULL/*multicast interface*/, NULL/*minissdpd socket path*/, 0/*sameport*/, 0/*ipv6*/, &upnperror);
if (devlist)
{
dev = devlist;
while (dev)
{
if (strstr (dev->st, "InternetGatewayDevice"))
break;
dev = dev->pNext;
}
if (!dev)
dev = devlist; /* defaulting to first device */
cnote << "UPnP device:" << dev->descURL << "[st:" << dev->st << "]";
#if MINIUPNPC_API_VERSION >= 9
descXML = (char*)miniwget(dev->descURL, &descXMLsize, 0);
#else
descXML = (char*)miniwget(dev->descURL, &descXMLsize);
#endif
if (descXML)
{
parserootdesc (descXML, descXMLsize, m_data.get());
free (descXML); descXML = 0;
#if MINIUPNPC_API_VERSION >= 9
GetUPNPUrls (m_urls.get(), m_data.get(), dev->descURL, 0);
#else
GetUPNPUrls (m_urls.get(), m_data.get(), dev->descURL);
#endif
m_ok = true;
}
freeUPNPDevlist(devlist);
}
else
#endif
{
cnote << "UPnP device not found.";
throw NoUPnPDevice();
}
}
UPnP::~UPnP()
{
auto r = m_reg;
for (auto i: r)
removeRedirect(i);
}
string UPnP::externalIP()
{
#if ETH_MINIUPNPC
char addr[16];
if (!UPNP_GetExternalIPAddress(m_urls->controlURL, m_data->first.servicetype, addr))
return addr;
else
#endif
return "0.0.0.0";
}
int UPnP::addRedirect(char const* _addr, int _port)
{
(void)_addr;
(void)_port;
#if ETH_MINIUPNPC
if (m_urls->controlURL[0] == '\0')
{
cwarn << "UPnP::addRedirect() called without proper initialisation?";
return -1;
}
// Try direct mapping first (port external, port internal).
char port_str[16];
sprintf(port_str, "%d", _port);
if (!UPNP_AddPortMapping(m_urls->controlURL, m_data->first.servicetype, port_str, port_str, _addr, "ethereum", "TCP", NULL, NULL))
return _port;
// Failed - now try (random external, port internal) and cycle up to 10 times.
for (uint i = 0; i < 10; ++i)
{
_port = rand() % 65535 - 1024 + 1024;
sprintf(port_str, "%d", _port);
if (!UPNP_AddPortMapping(m_urls->controlURL, m_data->first.servicetype, NULL, port_str, _addr, "ethereum", "TCP", NULL, NULL))
return _port;
}
// Failed. Try asking the router to give us a free external port.
if (UPNP_AddPortMapping(m_urls->controlURL, m_data->first.servicetype, port_str, NULL, _addr, "ethereum", "TCP", NULL, NULL))
// Failed. Exit.
return 0;
// We got mapped, but we don't know which ports we got mapped to. Now to find...
unsigned num = 0;
UPNP_GetPortMappingNumberOfEntries(m_urls->controlURL, m_data->first.servicetype, &num);
for (unsigned i = 0; i < num; ++i)
{
char extPort[16];
char intClient[16];
char intPort[6];
char protocol[4];
char desc[80];
char enabled[4];
char rHost[64];
char duration[16];
UPNP_GetGenericPortMappingEntry(m_urls->controlURL, m_data->first.servicetype, toString(i).c_str(), extPort, intClient, intPort, protocol, desc, enabled, rHost, duration);
if (string("ethereum") == desc)
{
m_reg.insert(atoi(extPort));
return atoi(extPort);
}
}
cerr << "ERROR: Mapped port not found." << endl;
#endif
return 0;
}
void UPnP::removeRedirect(int _port)
{
(void)_port;
#if ETH_MINIUPNPC
char port_str[16];
// int t;
printf("TB : upnp_rem_redir (%d)\n", _port);
if (m_urls->controlURL[0] == '\0')
{
printf("TB : the init was not done !\n");
return;
}
sprintf(port_str, "%d", _port);
UPNP_DeletePortMapping(m_urls->controlURL, m_data->first.servicetype, port_str, "TCP", NULL);
m_reg.erase(_port);
#endif
}

53
libethcore/UPnP.h

@ -1,53 +0,0 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file UPnP.h
* @authors:
* Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include <set>
#include <string>
#include <memory>
struct UPNPUrls;
struct IGDdatas;
namespace eth
{
class UPnP
{
public:
UPnP();
~UPnP();
std::string externalIP();
int addRedirect(char const* addr, int port);
void removeRedirect(int port);
bool isValid() const { return m_ok; }
std::set<int> m_reg;
bool m_ok;
std::shared_ptr<struct UPNPUrls> m_urls;
std::shared_ptr<struct IGDdatas> m_data;
};
}

1
libethereum/CMakeLists.txt

@ -19,6 +19,7 @@ include_directories(..)
target_link_libraries(${EXECUTABLE} evm)
target_link_libraries(${EXECUTABLE} lll)
target_link_libraries(${EXECUTABLE} whisper)
target_link_libraries(${EXECUTABLE} ethcore)
target_link_libraries(${EXECUTABLE} ethnet)
target_link_libraries(${EXECUTABLE} secp256k1)

22
libethereum/Client.cpp

@ -162,6 +162,17 @@ void Client::uninstallWatch(unsigned _i)
m_filters.erase(fit);
}
void Client::noteChanged(h256Set const& _filters)
{
lock_guard<mutex> l(m_filterLock);
for (auto& i: m_watches)
if (_filters.count(i.second.id))
{
cwatch << "!!!" << i.first << i.second.id;
i.second.changes++;
}
}
void Client::appendFromNewPending(h256 _bloom, h256Set& o_changed) const
{
lock_guard<mutex> l(m_filterLock);
@ -180,17 +191,6 @@ void Client::appendFromNewBlock(h256 _block, h256Set& o_changed) const
o_changed.insert(i.first);
}
void Client::noteChanged(h256Set const& _filters)
{
lock_guard<mutex> l(m_filterLock);
for (auto& i: m_watches)
if (_filters.count(i.second.id))
{
cwatch << "!!!" << i.first << i.second.id;
i.second.changes++;
}
}
void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHost, unsigned short _port, NodeMode _mode, unsigned _peers, string const& _publicIP, bool _upnp, u256 _networkId)
{
static const char* c_threadName = "net";

1
libethereum/CommonNet.h

@ -36,7 +36,6 @@ static const eth::uint c_maxHashesAsk = 32; ///< Maximum number of hashes GetBlo
static const eth::uint c_maxBlocks = 16; ///< Maximum number of blocks Blocks will ever send.
static const eth::uint c_maxBlocksAsk = 16; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
class UPnP;
class OverlayDB;
class BlockChain;
class TransactionQueue;

1
libethereum/EthereumHost.cpp

@ -29,7 +29,6 @@
#include <libethential/Common.h>
#include <libethnet/PeerHost.h>
#include <libethnet/PeerSession.h>
#include <libethcore/UPnP.h>
#include <libethcore/Exceptions.h>
#include "BlockChain.h"
#include "TransactionQueue.h"

24
libethereum/EthereumPeer.cpp

@ -30,7 +30,7 @@
using namespace std;
using namespace eth;
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->id() << "] "
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
EthereumPeer::EthereumPeer(PeerSession* _s, HostCapabilityFace* _h): PeerCapability(_s, _h)
{
@ -100,7 +100,7 @@ inline string toString(h256s const& _bs)
void EthereumPeer::giveUpOnFetch()
{
clogS(NetNote) << "GIVE UP FETCH; can't get " << toString(m_askedBlocks);
clogS(NetNote) << "GIVE UP FETCH; can't get" << toString(m_askedBlocks);
if (m_askedBlocks.size())
{
Guard l (host()->x_blocksNeeded);
@ -127,7 +127,7 @@ bool EthereumPeer::interpret(RLP const& _r)
m_latestHash = _r[4].toHash<h256>();
auto genesisHash = _r[5].toHash<h256>();
clogS(NetMessageSummary) << "Status: " << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
if (genesisHash != host()->m_chain->genesisHash())
disable("Invalid genesis hash");
@ -145,7 +145,7 @@ bool EthereumPeer::interpret(RLP const& _r)
break;
}
case TransactionsPacket:
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << " entries)";
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)";
addRating(_r.itemCount() - 1);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
@ -157,7 +157,7 @@ bool EthereumPeer::interpret(RLP const& _r)
{
h256 later = _r[1].toHash<h256>();
unsigned limit = _r[2].toInt<unsigned>();
clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries, " << later.abridged() << ")";
clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later.abridged() << ")";
unsigned c = min<unsigned>(host()->m_chain->number(later), limit);
@ -171,7 +171,7 @@ bool EthereumPeer::interpret(RLP const& _r)
}
case BlockHashesPacket:
{
clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << " entries)";
clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << "entries)";
if (_r.itemCount() == 1)
{
host()->noteHaveChain(this);
@ -197,7 +197,7 @@ bool EthereumPeer::interpret(RLP const& _r)
}
case GetBlocksPacket:
{
clogS(NetMessageSummary) << "GetBlocks (" << dec << (_r.itemCount() - 1) << " entries)";
clogS(NetMessageSummary) << "GetBlocks (" << dec << (_r.itemCount() - 1) << "entries)";
// TODO: return the requested blocks.
bytes rlp;
unsigned n = 0;
@ -216,7 +216,7 @@ bool EthereumPeer::interpret(RLP const& _r)
}
case BlocksPacket:
{
clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << " entries)";
clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)";
if (_r.itemCount() == 1 && !m_askedBlocksChanged)
{
@ -246,16 +246,16 @@ bool EthereumPeer::interpret(RLP const& _r)
if (!host()->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{
unknownParents++;
clogS(NetAllDetail) << "Unknown parent " << bi.parentHash << " of block " << h;
clogS(NetAllDetail) << "Unknown parent" << bi.parentHash << "of block" << h;
}
else
{
knownParents++;
clogS(NetAllDetail) << "Known parent " << bi.parentHash << " of block " << h;
clogS(NetAllDetail) << "Known parent" << bi.parentHash << "of block" << h;
}
}
}
clogS(NetMessageSummary) << dec << knownParents << " known parents, " << unknownParents << "unknown, " << used << "used.";
clogS(NetMessageSummary) << dec << knownParents << "known parents," << unknownParents << "unknown," << used << "used.";
continueGettingChain();
break;
}
@ -301,7 +301,7 @@ void EthereumPeer::continueGettingChain()
}
else
{
clogS(NetMessageSummary) << "No blocks left to get. Peer doesn't seem to have " << m_failedBlocks.size() << "of our needed blocks.";
clogS(NetMessageSummary) << "No blocks left to get. Peer doesn't seem to have" << m_failedBlocks.size() << "of our needed blocks.";
host()->noteDoneBlocks();
}
}

12
libethereum/EthereumPeer.h

@ -36,18 +36,6 @@ namespace eth
class HostCapabilityFace;
class WhisperSession: public PeerCapability
{
public:
WhisperSession();
virtual ~WhisperSession();
static std::string name() { return "shh"; }
private:
virtual bool interpret(RLP const&) { return false; }
};
/**
* @brief The EthereumPeer class
* @todo Document fully.

3
libethnet/Common.h

@ -110,7 +110,7 @@ public:
protected:
virtual std::string name() const = 0;
virtual PeerCapability* newPeerCapability(PeerSession* _s) = 0;
virtual bool isInitialised() const = 0;
virtual bool isInitialised() const { return true; }
void seal(bytes& _b);
@ -128,7 +128,6 @@ public:
static std::string staticName() { return PeerCap::name(); }
protected:
virtual bool isInitialised() const = 0;
virtual std::string name() const { return PeerCap::name(); }
virtual PeerCapability* newPeerCapability(PeerSession* _s) { return new PeerCap(_s, this); }
};

2
libethnet/PeerHost.cpp

@ -35,9 +35,9 @@
#include <chrono>
#include <thread>
#include <libethential/Common.h>
#include <libethcore/UPnP.h>
#include <libethcore/Exceptions.h>
#include "PeerSession.h"
#include "UPnP.h"
using namespace std;
using namespace eth;

2
libethnet/PeerHost.h

@ -107,6 +107,8 @@ public:
/// Deserialise the data and populate the set of known peers.
void restorePeers(bytesConstRef _b);
h512 id() const { return m_key.pub(); }
void registerPeer(std::shared_ptr<PeerSession> _s, std::vector<std::string> const& _caps);
protected:

3
libethnet/PeerSession.h

@ -53,7 +53,8 @@ public:
bool isOpen() const { return m_socket.is_open(); }
unsigned id() const { return m_socket.native_handle(); }
h512 id() const { return m_id; }
unsigned socketId() const { return m_socket.native_handle(); }
bi::tcp::endpoint endpoint() const; ///< for other peers to connect to.

1
libwhisper/CMakeLists.txt

@ -20,6 +20,7 @@ include_directories(..)
target_link_libraries(${EXECUTABLE} evm)
target_link_libraries(${EXECUTABLE} lll)
target_link_libraries(${EXECUTABLE} ethential)
target_link_libraries(${EXECUTABLE} ethcore)
target_link_libraries(${EXECUTABLE} secp256k1)
if(MINIUPNPC_LS)
target_link_libraries(${EXECUTABLE} ${MINIUPNPC_LS})

13
libwhisper/Whisper.cpp → libwhisper/Common.cpp

@ -14,21 +14,14 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Whisper.cpp
/** @file Common.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "Whisper.h"
#include "Common.h"
#include <libethential/Log.h>
using namespace std;
using namespace eth;
using namespace shh;
Whisper::Whisper()
{
}
Whisper::~Whisper()
{
}

50
libwhisper/Whisper.h → libwhisper/Common.h

@ -14,37 +14,43 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Whisper.h
/** @file Common.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
namespace eth
{
/*
class NetPeer
{
public:
NetPeer();
virtual ~NetPeer();
#include <string>
#include <chrono>
#include <libethential/Common.h>
#include <libethential/Log.h>
#include <libethcore/CommonEth.h>
#include <libethnet/Common.h>
protected:
virtual void onIncoming(PeerId);
void send(PeerId);
};
*/
/**
*/
class Whisper//: public NetPeer
namespace shh
{
public:
/// Constructor.
Whisper();
/// Destructor.
virtual ~Whisper();
using h256 = eth::h256;
using h512 = eth::h512;
using h256s = eth::h256s;
using bytes = eth::bytes;
using RLPStream = eth::RLPStream;
using RLP = eth::RLP;
using bytesRef = eth::bytesRef;
using bytesConstRef = eth::bytesConstRef;
using h256Set = eth::h256Set;
class WhisperHost;
class WhisperPeer;
class Whisper;
enum WhisperPacket
{
StatusPacket = 0x20,
MessagesPacket,
AddFilterPacket,
RemoveFilterPacket
};
}

216
libwhisper/WhisperPeer.cpp

@ -0,0 +1,216 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Whisper.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "WhisperPeer.h"
#include <libethential/Log.h>
#include <libethnet/All.h>
using namespace std;
using namespace eth;
using namespace shh;
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
WhisperPeer::WhisperPeer(PeerSession* _s, HostCapabilityFace* _h): PeerCapability(_s, _h)
{
RLPStream s;
prep(s);
sealAndSend(s.appendList(2) << StatusPacket << host()->protocolVersion());
}
WhisperPeer::~WhisperPeer()
{
}
WhisperHost* WhisperPeer::host() const
{
return static_cast<WhisperHost*>(PeerCapability::hostCapability());
}
bool WhisperPeer::interpret(RLP const& _r)
{
switch (_r[0].toInt<unsigned>())
{
case StatusPacket:
{
auto protocolVersion = _r[1].toInt<unsigned>();
clogS(NetMessageSummary) << "Status: " << protocolVersion;
if (protocolVersion != host()->protocolVersion())
disable("Invalid protocol version.");
if (session()->id() < host()->host()->id())
sendMessages();
break;
}
case MessagesPacket:
{
unsigned n = 0;
for (auto i: _r)
if (n++)
host()->inject(Message(i), this);
sendMessages();
break;
}
default:
return false;
}
return true;
}
void WhisperPeer::sendMessages()
{
RLPStream amalg;
unsigned n = 0;
Guard l(x_unseen);
while (m_unseen.size())
{
auto p = *m_unseen.begin();
m_unseen.erase(m_unseen.begin());
host()->streamMessage(p.second, amalg);
n++;
}
// pause before sending if no messages to send
if (!n)
this_thread::sleep_for(chrono::milliseconds(100));
RLPStream s;
prep(s);
s.appendList(n + 1) << MessagesPacket;
s.appendRaw(amalg.out(), n);
sealAndSend(s);
}
void WhisperPeer::noteNewMessage(h256 _h, Message const& _m)
{
Guard l(x_unseen);
m_unseen[rating(_m)] = _h;
}
WhisperHost::WhisperHost()
{
}
WhisperHost::~WhisperHost()
{
}
void WhisperHost::streamMessage(h256 _m, RLPStream& _s) const
{
UpgradableGuard l(x_messages);
if (m_messages.count(_m))
{
UpgradeGuard ll(l);
m_messages.at(_m).streamOut(_s);
}
}
void WhisperHost::inject(Message const& _m, WhisperPeer* _p)
{
auto h = _m.sha3();
{
UpgradableGuard l(x_messages);
if (m_messages.count(h))
return;
UpgradeGuard ll(l);
m_messages[h] = _m;
}
if (_p)
{
Guard l(m_filterLock);
for (auto const& f: m_filters)
if (f.second.filter.matches(_m))
noteChanged(h, f.first);
}
for (auto& i: peers())
if (i->cap<WhisperPeer>().get() == _p)
i->addRating(1);
else
i->cap<WhisperPeer>()->noteNewMessage(h, _m);
}
void WhisperHost::noteChanged(h256 _messageHash, h256 _filter)
{
for (auto& i: m_watches)
if (i.second.id == _filter)
{
cwatch << "!!!" << i.first << i.second.id;
i.second.changes.push_back(_messageHash);
}
}
bool MessageFilter::matches(Message const& _m) const
{
for (auto const& t: m_topicMasks)
{
if (t.first.size() != t.second.size() || _m.topic.size() < t.first.size())
continue;
for (unsigned i = 0; i < t.first.size(); ++i)
if (((t.first[i] ^ _m.topic[i]) & t.second[i]) != 0)
goto NEXT;
return true;
NEXT:;
}
return false;
}
unsigned WhisperHost::installWatch(h256 _h)
{
auto ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0;
m_watches[ret] = ClientWatch(_h);
cwatch << "+++" << ret << _h;
return ret;
}
unsigned WhisperHost::installWatch(shh::MessageFilter const& _f)
{
Guard l(m_filterLock);
h256 h = _f.sha3();
if (!m_filters.count(h))
m_filters.insert(make_pair(h, _f));
return installWatch(h);
}
void WhisperHost::uninstallWatch(unsigned _i)
{
cwatch << "XXX" << _i;
Guard l(m_filterLock);
auto it = m_watches.find(_i);
if (it == m_watches.end())
return;
auto id = it->second.id;
m_watches.erase(it);
auto fit = m_filters.find(id);
if (fit != m_filters.end())
if (!--fit->second.refCount)
m_filters.erase(fit);
}

200
libwhisper/WhisperPeer.h

@ -0,0 +1,200 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Whisper.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include <mutex>
#include <array>
#include <set>
#include <memory>
#include <utility>
#include <libethential/RLP.h>
#include <libethential/Guards.h>
#include <libethcore/SHA3.h>
#include "Common.h"
namespace shh
{
using eth::PeerSession;
using eth::HostCapabilityFace;
using eth::HostCapability;
struct Message
{
unsigned expiry = 0;
unsigned ttl = 0;
bytes topic;
bytes payload;
Message() {}
Message(unsigned _exp, unsigned _ttl, bytes const& _topic, bytes const& _payload): expiry(_exp), ttl(_ttl), topic(_topic), payload(_payload) {}
Message(RLP const& _m)
{
expiry = _m[0].toInt<unsigned>();
ttl = _m[1].toInt<unsigned>();
topic = _m[2].toBytes();
payload = _m[3].toBytes();
}
operator bool () const { return !!expiry; }
void streamOut(RLPStream& _s) const { _s.appendList(4) << expiry << ttl << topic << payload; }
h256 sha3() const { RLPStream s; streamOut(s); return eth::sha3(s.out()); }
};
/**
*/
class WhisperPeer: public eth::PeerCapability
{
friend class WhisperHost;
public:
WhisperPeer(PeerSession* _s, HostCapabilityFace* _h);
virtual ~WhisperPeer();
static std::string name() { return "shh"; }
WhisperHost* host() const;
private:
virtual bool interpret(RLP const&);
void sendMessages();
unsigned rating(Message const&) const { return 0; } // TODO
void noteNewMessage(h256 _h, Message const& _m);
mutable eth::Mutex x_unseen;
std::map<unsigned, h256> m_unseen; ///< Rated according to what they want.
};
class MessageFilter
{
public:
MessageFilter() {}
MessageFilter(std::vector<std::pair<bytes, bytes> > const& _m): m_topicMasks(_m) {}
MessageFilter(RLP const& _r): m_topicMasks((std::vector<std::pair<bytes, bytes> >)_r) {}
void fillStream(RLPStream& _s) const { _s << m_topicMasks; }
h256 sha3() const { RLPStream s; fillStream(s); return eth::sha3(s.out()); }
bool matches(Message const& _m) const;
private:
std::vector<std::pair<bytes, bytes> > m_topicMasks;
};
struct InstalledFilter
{
InstalledFilter(MessageFilter const& _f): filter(_f) {}
MessageFilter filter;
unsigned refCount = 1;
};
struct ClientWatch
{
ClientWatch() {}
explicit ClientWatch(h256 _id): id(_id) {}
h256 id;
h256s changes;
};
class WhisperHost: public HostCapability<WhisperPeer>
{
friend class WhisperPeer;
public:
WhisperHost();
virtual ~WhisperHost();
unsigned protocolVersion() const { return 0; }
void inject(Message const& _m, WhisperPeer* _from = nullptr);
unsigned installWatch(MessageFilter const& _filter);
unsigned installWatch(h256 _filterId);
void uninstallWatch(unsigned _watchId);
h256s peekWatch(unsigned _watchId) const { eth::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } }
h256s checkWatch(unsigned _watchId) { eth::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; }
Message message(h256 _m) const { try { eth::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Message(); } }
void sendRaw(bytes const& _payload, bytes const& _topic, unsigned _ttl) { inject(Message(time(0) + _ttl, _ttl, _topic, _payload)); }
private:
void streamMessage(h256 _m, RLPStream& _s) const;
void noteChanged(h256 _messageHash, h256 _filter);
mutable eth::SharedMutex x_messages;
std::map<h256, Message> m_messages;
mutable eth::Mutex m_filterLock;
std::map<h256, InstalledFilter> m_filters;
std::map<unsigned, ClientWatch> m_watches;
};
struct WatchChannel: public eth::LogChannel { static const char* name() { return "shh"; } static const int verbosity = 1; };
#define cwatch eth::LogOutputStream<shh::WatchChannel, true>()
class Watch;
}
/*
namespace std { void swap(shh::Watch& _a, shh::Watch& _b); }
namespace shh
{
class Watch: public boost::noncopyable
{
friend void std::swap(Watch& _a, Watch& _b);
public:
Watch() {}
Watch(Whisper& _c, h256 _f): m_c(&_c), m_id(_c.installWatch(_f)) {}
Watch(Whisper& _c, MessageFilter const& _tf): m_c(&_c), m_id(_c.installWatch(_tf)) {}
~Watch() { if (m_c) m_c->uninstallWatch(m_id); }
bool check() { return m_c ? m_c->checkWatch(m_id) : false; }
bool peek() { return m_c ? m_c->peekWatch(m_id) : false; }
private:
Whisper* m_c;
unsigned m_id;
};
}
namespace shh
{
inline void swap(shh::Watch& _a, shh::Watch& _b)
{
swap(_a.m_c, _b.m_c);
swap(_a.m_id, _b.m_id);
}
}
*/
Loading…
Cancel
Save