From aa094e5240059ff060531416c41af7d29f4b3907 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 27 Jan 2015 16:56:39 -0800 Subject: [PATCH] LogFilter supports new, better, filter mechanism. Exposed to JS API. --- alethzero/MainWin.cpp | 2 +- libethereum/LogFilter.cpp | 23 +-- libethereum/LogFilter.h | 4 +- libsolidity/CompilerStack.cpp | 2 +- libweb3jsonrpc/WebThreeStubServerBase.cpp | 16 +- libwhisper/WhisperPeer.cpp | 3 + test/whisperTopic.cpp | 184 ++++++++++++++++++++++ third/MainWin.cpp | 10 +- 8 files changed, 217 insertions(+), 27 deletions(-) diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index 1fbed013c..c4268e9b1 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -311,7 +311,7 @@ void Main::installBalancesWatch() altCoins.push_back(right160(ethereum()->stateAt(coinsAddr, i + 1))); for (auto i: m_myKeys) for (auto c: altCoins) - tf.address(c).topic(h256(i.address(), h256::AlignRight)); + tf.address(c).topic(0, h256(i.address(), h256::AlignRight)); uninstallWatch(m_balancesFilter); m_balancesFilter = installWatch(tf, [=](LocalisedLogEntries const&){ onBalancesChange(); }); diff --git a/libethereum/LogFilter.cpp b/libethereum/LogFilter.cpp index 65e819b98..79f43f2c6 100644 --- a/libethereum/LogFilter.cpp +++ b/libethereum/LogFilter.cpp @@ -43,20 +43,21 @@ bool LogFilter::matches(LogBloom _bloom) const { if (m_addresses.size()) { - for (auto i: m_addresses) + for (auto const& i: m_addresses) if (_bloom.containsBloom<3>(dev::sha3(i))) goto OK1; return false; } OK1: - if (m_topics.size()) - { - for (auto i: m_topics) - if (_bloom.containsBloom<3>(dev::sha3(i))) - goto OK2; - return false; - } - OK2: + for (auto const& t: m_topics) + if (t.size()) + { + for (auto const& i: t) + if (_bloom.containsBloom<3>(dev::sha3(i))) + goto OK2; + return false; + OK2:; + } return true; } @@ -73,8 +74,8 @@ LogEntries LogFilter::matches(TransactionReceipt const& _m) const { if (!m_addresses.empty() && !m_addresses.count(e.address)) goto continue2; - for (auto const& t: m_topics) - if (!std::count(e.topics.begin(), e.topics.end(), t)) + for (unsigned i = 0; i < 4; ++i) + if (!m_topics[i].empty() && (e.topics.size() < i || !m_topics[i].count(e.topics[i]))) goto continue2; ret.push_back(e); continue2:; diff --git a/libethereum/LogFilter.h b/libethereum/LogFilter.h index bda8e46a2..b99ba8ee0 100644 --- a/libethereum/LogFilter.h +++ b/libethereum/LogFilter.h @@ -50,7 +50,7 @@ public: LogEntries matches(TransactionReceipt const& _r) const; LogFilter address(Address _a) { m_addresses.insert(_a); return *this; } - LogFilter topic(h256 const& _t) { m_topics.insert(_t); return *this; } + LogFilter topic(unsigned _index, h256 const& _t) { if (_index < 4) m_topics[_index].insert(_t); return *this; } LogFilter withMax(unsigned _m) { m_max = _m; return *this; } LogFilter withSkip(unsigned _m) { m_skip = _m; return *this; } LogFilter withEarliest(int _e) { m_earliest = _e; return *this; } @@ -58,7 +58,7 @@ public: private: AddressSet m_addresses; - h256Set m_topics; + std::array m_topics; int m_earliest = 0; int m_latest = -1; unsigned m_max; diff --git a/libsolidity/CompilerStack.cpp b/libsolidity/CompilerStack.cpp index a8ba037f2..2917a382a 100644 --- a/libsolidity/CompilerStack.cpp +++ b/libsolidity/CompilerStack.cpp @@ -145,7 +145,7 @@ string CompilerStack::expanded(string const& _sourceCode) { "CoinReg", "contract CoinReg{function count()constant returns(uint256 r){}function info(uint256 i)constant returns(address addr,string3 name,uint256 denom){}function register(string3 name,uint256 denom){}function unregister(){}}" }, { "coin", "#require CoinReg\ncontract coin {function coin(string3 name, uint denom) {CoinReg(Config().lookup(3)).register(name, denom);}}" }, { "service", "#require Config\ncontract service{function service(uint _n){Config().register(_n, this);}}" }, - { "owned", "contract owned{function owned(){owner = msg.sender;}address owner;}" }, + { "owned", "contract owned{function owned(){owner = msg.sender;}modifier onlyowner(){if(msg.sender==owner)_}address owner;}" }, { "mortal", "#require owned\ncontract mortal is owned {function kill() { if (msg.sender == owner) suicide(owner); }}" }, { "NameReg", "contract NameReg{function register(string32 name){}function addressOf(string32 name)constant returns(address addr){}function unregister(){}function nameOf(address addr)constant returns(string32 name){}}" }, { "named", "#require Config NameReg\ncontract named {function named(string32 name) {NameReg(Config().lookup(1)).register(name);}}" }, diff --git a/libweb3jsonrpc/WebThreeStubServerBase.cpp b/libweb3jsonrpc/WebThreeStubServerBase.cpp index e5fed1fc1..31f128be8 100644 --- a/libweb3jsonrpc/WebThreeStubServerBase.cpp +++ b/libweb3jsonrpc/WebThreeStubServerBase.cpp @@ -123,16 +123,18 @@ static dev::eth::LogFilter toLogFilter(Json::Value const& _json) // commented to else if (_json["address"].isString()) filter.address(jsToAddress(_json["address"].asString())); } - if (!_json["topics"].empty()) + if (!_json["topics"].empty() && _json["topics"].isArray()) { - if (_json["topics"].isArray()) + unsigned i = 0; + for (auto t: _json["topics"]) { - for (auto i: _json["topics"]) - if (i.isString()) - filter.topic(jsToU256(i.asString())); + if (t.isArray()) + for (auto tt: t) + filter.topic(i, jsToFixed<32>(tt.asString())); + else if (t.isString()) + filter.topic(i, jsToFixed<32>(t.asString())); + i++; } - else if(_json["topics"].isString()) - filter.topic(jsToU256(_json["topics"].asString())); } return filter; } diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index 1dbd6e16e..dfa7ab628 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -62,6 +62,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) if (protocolVersion != version()) disable("Invalid protocol version."); + for (auto const& m: host()->all()) + m_unseen.insert(make_pair(0, m.first)); + if (session()->id() < host()->host()->id()) sendMessages(); break; diff --git a/test/whisperTopic.cpp b/test/whisperTopic.cpp index fa7d24db8..31cefdb8a 100644 --- a/test/whisperTopic.cpp +++ b/test/whisperTopic.cpp @@ -87,4 +87,188 @@ BOOST_AUTO_TEST_CASE(topic) BOOST_REQUIRE_EQUAL(result, 1 + 9 + 25 + 49 + 81); } +BOOST_AUTO_TEST_CASE(forwarding) +{ + cnote << "Testing Whisper forwarding..."; + auto oldLogVerbosity = g_logVerbosity; + g_logVerbosity = 0; + + unsigned result = 0; + bool done = false; + + bool startedListener = false; + std::thread listener([&]() + { + setThreadName("listener"); + + // Host must be configured not to share peers. + Host ph("Listner", NetworkPreferences(50303, "", false, true)); + ph.setIdealPeerCount(0); + auto wh = ph.registerCapability(new WhisperHost()); + ph.start(); + + startedListener = true; + + /// Only interested in odd packets + auto w = wh->installWatch(BuildTopicMask("test")); + + for (int i = 0; i < 200 && !result; ++i) + { + for (auto i: wh->checkWatch(w)) + { + Message msg = wh->envelope(i).open(); + unsigned last = RLP(msg.payload()).toInt(); + cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt(); + result = last; + } + this_thread::sleep_for(chrono::milliseconds(50)); + } + }); + + bool startedForwarder = false; + std::thread forwarder([&]() + { + setThreadName("forwarder"); + + while (!startedListener) + this_thread::sleep_for(chrono::milliseconds(50)); + + // Host must be configured not to share peers. + Host ph("Forwarder", NetworkPreferences(50305, "", false, true)); + ph.setIdealPeerCount(0); + auto wh = ph.registerCapability(new WhisperHost()); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.start(); + + this_thread::sleep_for(chrono::milliseconds(500)); + ph.connect("127.0.0.1", 50303); + + startedForwarder = true; + + /// Only interested in odd packets + auto w = wh->installWatch(BuildTopicMask("test")); + + while (!done) + { + for (auto i: wh->checkWatch(w)) + { + Message msg = wh->envelope(i).open(); + cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt(); + } + this_thread::sleep_for(chrono::milliseconds(50)); + } + }); + + while (!startedForwarder) + this_thread::sleep_for(chrono::milliseconds(50)); + + Host ph("Sender", NetworkPreferences(50300, "", false, true)); + ph.setIdealPeerCount(0); + shared_ptr wh = ph.registerCapability(new WhisperHost()); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.start(); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.connect("127.0.0.1", 50305); + + KeyPair us = KeyPair::create(); + wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); + this_thread::sleep_for(chrono::milliseconds(250)); + + listener.join(); + done = true; + forwarder.join(); + g_logVerbosity = oldLogVerbosity; + + BOOST_REQUIRE_EQUAL(result, 1); +} + +BOOST_AUTO_TEST_CASE(asyncforwarding) +{ + cnote << "Testing Whisper async forwarding..."; + auto oldLogVerbosity = g_logVerbosity; + g_logVerbosity = 2; + + unsigned result = 0; + bool done = false; + + bool startedForwarder = false; + std::thread forwarder([&]() + { + setThreadName("forwarder"); + + // Host must be configured not to share peers. + Host ph("Forwarder", NetworkPreferences(50305, "", false, true)); + ph.setIdealPeerCount(0); + auto wh = ph.registerCapability(new WhisperHost()); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.start(); + + this_thread::sleep_for(chrono::milliseconds(500)); + ph.connect("127.0.0.1", 50303); + + startedForwarder = true; + + /// Only interested in odd packets + auto w = wh->installWatch(BuildTopicMask("test")); + + while (!done) + { + for (auto i: wh->checkWatch(w)) + { + Message msg = wh->envelope(i).open(); + cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt(); + } + this_thread::sleep_for(chrono::milliseconds(50)); + } + }); + + while (!startedForwarder) + this_thread::sleep_for(chrono::milliseconds(50)); + + { + Host ph("Sender", NetworkPreferences(50300, "", false, true)); + ph.setIdealPeerCount(0); + shared_ptr wh = ph.registerCapability(new WhisperHost()); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.start(); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.connect("127.0.0.1", 50305); + + KeyPair us = KeyPair::create(); + wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); + this_thread::sleep_for(chrono::milliseconds(250)); + } + + { + Host ph("Listener", NetworkPreferences(50300, "", false, true)); + ph.setIdealPeerCount(0); + shared_ptr wh = ph.registerCapability(new WhisperHost()); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.start(); + this_thread::sleep_for(chrono::milliseconds(500)); + ph.connect("127.0.0.1", 50305); + + /// Only interested in odd packets + auto w = wh->installWatch(BuildTopicMask("test")); + + for (int i = 0; i < 200 && !result; ++i) + { + for (auto i: wh->checkWatch(w)) + { + Message msg = wh->envelope(i).open(); + unsigned last = RLP(msg.payload()).toInt(); + cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt(); + result = last; + } + this_thread::sleep_for(chrono::milliseconds(50)); + } + } + + done = true; + forwarder.join(); + g_logVerbosity = oldLogVerbosity; + + BOOST_REQUIRE_EQUAL(result, 1); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/third/MainWin.cpp b/third/MainWin.cpp index 35650f94b..eae421c24 100644 --- a/third/MainWin.cpp +++ b/third/MainWin.cpp @@ -197,21 +197,21 @@ unsigned Main::installWatch(dev::h256 _tf, WatchHandler const& _f) void Main::installWatches() { - installWatch(dev::eth::LogFilter().topic((u256)(u160)c_config).topic((u256)0), [=](LocalisedLogEntries const&){ installNameRegWatch(); }); - installWatch(dev::eth::LogFilter().topic((u256)(u160)c_config).topic((u256)1), [=](LocalisedLogEntries const&){ installCurrenciesWatch(); }); + installWatch(dev::eth::LogFilter().address(c_config).topic(0, (u256)0), [=](LocalisedLogEntries const&){ installNameRegWatch(); }); + installWatch(dev::eth::LogFilter().address(c_config).topic(0, (u256)1), [=](LocalisedLogEntries const&){ installCurrenciesWatch(); }); installWatch(dev::eth::ChainChangedFilter, [=](LocalisedLogEntries const&){ onNewBlock(); }); } void Main::installNameRegWatch() { ethereum()->uninstallWatch(m_nameRegFilter); - m_nameRegFilter = installWatch(dev::eth::LogFilter().topic(ethereum()->stateAt(c_config, 0)), [=](LocalisedLogEntries const&){ onNameRegChange(); }); + m_nameRegFilter = installWatch(dev::eth::LogFilter().address(u160(ethereum()->stateAt(c_config, 0))), [=](LocalisedLogEntries const&){ onNameRegChange(); }); } void Main::installCurrenciesWatch() { ethereum()->uninstallWatch(m_currenciesFilter); - m_currenciesFilter = installWatch(dev::eth::LogFilter().topic(ethereum()->stateAt(c_config, 1)), [=](LocalisedLogEntries const&){ onCurrenciesChange(); }); + m_currenciesFilter = installWatch(dev::eth::LogFilter().address(u160(ethereum()->stateAt(c_config, 1))), [=](LocalisedLogEntries const&){ onCurrenciesChange(); }); } void Main::installBalancesWatch() @@ -224,7 +224,7 @@ void Main::installBalancesWatch() altCoins.push_back(right160(ethereum()->stateAt(coinsAddr, i + 1))); for (auto i: m_myKeys) for (auto c: altCoins) - tf.address(c).topic((u256)(u160)i.address()); + tf.address(c).topic(0, (u256)(u160)i.address()); ethereum()->uninstallWatch(m_balancesFilter); m_balancesFilter = installWatch(tf, [=](LocalisedLogEntries const&){ onBalancesChange(); });