Browse Source

LogFilter supports new, better, filter mechanism.

Exposed to JS API.
cl-refactor
Gav Wood 10 years ago
parent
commit
aa094e5240
  1. 2
      alethzero/MainWin.cpp
  2. 23
      libethereum/LogFilter.cpp
  3. 4
      libethereum/LogFilter.h
  4. 2
      libsolidity/CompilerStack.cpp
  5. 16
      libweb3jsonrpc/WebThreeStubServerBase.cpp
  6. 3
      libwhisper/WhisperPeer.cpp
  7. 184
      test/whisperTopic.cpp
  8. 10
      third/MainWin.cpp

2
alethzero/MainWin.cpp

@ -311,7 +311,7 @@ void Main::installBalancesWatch()
altCoins.push_back(right160(ethereum()->stateAt(coinsAddr, i + 1)));
for (auto i: m_myKeys)
for (auto c: altCoins)
tf.address(c).topic(h256(i.address(), h256::AlignRight));
tf.address(c).topic(0, h256(i.address(), h256::AlignRight));
uninstallWatch(m_balancesFilter);
m_balancesFilter = installWatch(tf, [=](LocalisedLogEntries const&){ onBalancesChange(); });

23
libethereum/LogFilter.cpp

@ -43,20 +43,21 @@ bool LogFilter::matches(LogBloom _bloom) const
{
if (m_addresses.size())
{
for (auto i: m_addresses)
for (auto const& i: m_addresses)
if (_bloom.containsBloom<3>(dev::sha3(i)))
goto OK1;
return false;
}
OK1:
if (m_topics.size())
{
for (auto i: m_topics)
if (_bloom.containsBloom<3>(dev::sha3(i)))
goto OK2;
return false;
}
OK2:
for (auto const& t: m_topics)
if (t.size())
{
for (auto const& i: t)
if (_bloom.containsBloom<3>(dev::sha3(i)))
goto OK2;
return false;
OK2:;
}
return true;
}
@ -73,8 +74,8 @@ LogEntries LogFilter::matches(TransactionReceipt const& _m) const
{
if (!m_addresses.empty() && !m_addresses.count(e.address))
goto continue2;
for (auto const& t: m_topics)
if (!std::count(e.topics.begin(), e.topics.end(), t))
for (unsigned i = 0; i < 4; ++i)
if (!m_topics[i].empty() && (e.topics.size() < i || !m_topics[i].count(e.topics[i])))
goto continue2;
ret.push_back(e);
continue2:;

4
libethereum/LogFilter.h

@ -50,7 +50,7 @@ public:
LogEntries matches(TransactionReceipt const& _r) const;
LogFilter address(Address _a) { m_addresses.insert(_a); return *this; }
LogFilter topic(h256 const& _t) { m_topics.insert(_t); return *this; }
LogFilter topic(unsigned _index, h256 const& _t) { if (_index < 4) m_topics[_index].insert(_t); return *this; }
LogFilter withMax(unsigned _m) { m_max = _m; return *this; }
LogFilter withSkip(unsigned _m) { m_skip = _m; return *this; }
LogFilter withEarliest(int _e) { m_earliest = _e; return *this; }
@ -58,7 +58,7 @@ public:
private:
AddressSet m_addresses;
h256Set m_topics;
std::array<h256Set, 4> m_topics;
int m_earliest = 0;
int m_latest = -1;
unsigned m_max;

2
libsolidity/CompilerStack.cpp

@ -145,7 +145,7 @@ string CompilerStack::expanded(string const& _sourceCode)
{ "CoinReg", "contract CoinReg{function count()constant returns(uint256 r){}function info(uint256 i)constant returns(address addr,string3 name,uint256 denom){}function register(string3 name,uint256 denom){}function unregister(){}}" },
{ "coin", "#require CoinReg\ncontract coin {function coin(string3 name, uint denom) {CoinReg(Config().lookup(3)).register(name, denom);}}" },
{ "service", "#require Config\ncontract service{function service(uint _n){Config().register(_n, this);}}" },
{ "owned", "contract owned{function owned(){owner = msg.sender;}address owner;}" },
{ "owned", "contract owned{function owned(){owner = msg.sender;}modifier onlyowner(){if(msg.sender==owner)_}address owner;}" },
{ "mortal", "#require owned\ncontract mortal is owned {function kill() { if (msg.sender == owner) suicide(owner); }}" },
{ "NameReg", "contract NameReg{function register(string32 name){}function addressOf(string32 name)constant returns(address addr){}function unregister(){}function nameOf(address addr)constant returns(string32 name){}}" },
{ "named", "#require Config NameReg\ncontract named {function named(string32 name) {NameReg(Config().lookup(1)).register(name);}}" },

16
libweb3jsonrpc/WebThreeStubServerBase.cpp

@ -123,16 +123,18 @@ static dev::eth::LogFilter toLogFilter(Json::Value const& _json) // commented to
else if (_json["address"].isString())
filter.address(jsToAddress(_json["address"].asString()));
}
if (!_json["topics"].empty())
if (!_json["topics"].empty() && _json["topics"].isArray())
{
if (_json["topics"].isArray())
unsigned i = 0;
for (auto t: _json["topics"])
{
for (auto i: _json["topics"])
if (i.isString())
filter.topic(jsToU256(i.asString()));
if (t.isArray())
for (auto tt: t)
filter.topic(i, jsToFixed<32>(tt.asString()));
else if (t.isString())
filter.topic(i, jsToFixed<32>(t.asString()));
i++;
}
else if(_json["topics"].isString())
filter.topic(jsToU256(_json["topics"].asString()));
}
return filter;
}

3
libwhisper/WhisperPeer.cpp

@ -62,6 +62,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
if (protocolVersion != version())
disable("Invalid protocol version.");
for (auto const& m: host()->all())
m_unseen.insert(make_pair(0, m.first));
if (session()->id() < host()->host()->id())
sendMessages();
break;

184
test/whisperTopic.cpp

@ -87,4 +87,188 @@ BOOST_AUTO_TEST_CASE(topic)
BOOST_REQUIRE_EQUAL(result, 1 + 9 + 25 + 49 + 81);
}
BOOST_AUTO_TEST_CASE(forwarding)
{
cnote << "Testing Whisper forwarding...";
auto oldLogVerbosity = g_logVerbosity;
g_logVerbosity = 0;
unsigned result = 0;
bool done = false;
bool startedListener = false;
std::thread listener([&]()
{
setThreadName("listener");
// Host must be configured not to share peers.
Host ph("Listner", NetworkPreferences(50303, "", false, true));
ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost());
ph.start();
startedListener = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
for (int i = 0; i < 200 && !result; ++i)
{
for (auto i: wh->checkWatch(w))
{
Message msg = wh->envelope(i).open();
unsigned last = RLP(msg.payload()).toInt<unsigned>();
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
result = last;
}
this_thread::sleep_for(chrono::milliseconds(50));
}
});
bool startedForwarder = false;
std::thread forwarder([&]()
{
setThreadName("forwarder");
while (!startedListener)
this_thread::sleep_for(chrono::milliseconds(50));
// Host must be configured not to share peers.
Host ph("Forwarder", NetworkPreferences(50305, "", false, true));
ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50303);
startedForwarder = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
while (!done)
{
for (auto i: wh->checkWatch(w))
{
Message msg = wh->envelope(i).open();
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
}
this_thread::sleep_for(chrono::milliseconds(50));
}
});
while (!startedForwarder)
this_thread::sleep_for(chrono::milliseconds(50));
Host ph("Sender", NetworkPreferences(50300, "", false, true));
ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50305);
KeyPair us = KeyPair::create();
wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
this_thread::sleep_for(chrono::milliseconds(250));
listener.join();
done = true;
forwarder.join();
g_logVerbosity = oldLogVerbosity;
BOOST_REQUIRE_EQUAL(result, 1);
}
BOOST_AUTO_TEST_CASE(asyncforwarding)
{
cnote << "Testing Whisper async forwarding...";
auto oldLogVerbosity = g_logVerbosity;
g_logVerbosity = 2;
unsigned result = 0;
bool done = false;
bool startedForwarder = false;
std::thread forwarder([&]()
{
setThreadName("forwarder");
// Host must be configured not to share peers.
Host ph("Forwarder", NetworkPreferences(50305, "", false, true));
ph.setIdealPeerCount(0);
auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50303);
startedForwarder = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
while (!done)
{
for (auto i: wh->checkWatch(w))
{
Message msg = wh->envelope(i).open();
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
}
this_thread::sleep_for(chrono::milliseconds(50));
}
});
while (!startedForwarder)
this_thread::sleep_for(chrono::milliseconds(50));
{
Host ph("Sender", NetworkPreferences(50300, "", false, true));
ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50305);
KeyPair us = KeyPair::create();
wh->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
this_thread::sleep_for(chrono::milliseconds(250));
}
{
Host ph("Listener", NetworkPreferences(50300, "", false, true));
ph.setIdealPeerCount(0);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.connect("127.0.0.1", 50305);
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
for (int i = 0; i < 200 && !result; ++i)
{
for (auto i: wh->checkWatch(w))
{
Message msg = wh->envelope(i).open();
unsigned last = RLP(msg.payload()).toInt<unsigned>();
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
result = last;
}
this_thread::sleep_for(chrono::milliseconds(50));
}
}
done = true;
forwarder.join();
g_logVerbosity = oldLogVerbosity;
BOOST_REQUIRE_EQUAL(result, 1);
}
BOOST_AUTO_TEST_SUITE_END()

10
third/MainWin.cpp

@ -197,21 +197,21 @@ unsigned Main::installWatch(dev::h256 _tf, WatchHandler const& _f)
void Main::installWatches()
{
installWatch(dev::eth::LogFilter().topic((u256)(u160)c_config).topic((u256)0), [=](LocalisedLogEntries const&){ installNameRegWatch(); });
installWatch(dev::eth::LogFilter().topic((u256)(u160)c_config).topic((u256)1), [=](LocalisedLogEntries const&){ installCurrenciesWatch(); });
installWatch(dev::eth::LogFilter().address(c_config).topic(0, (u256)0), [=](LocalisedLogEntries const&){ installNameRegWatch(); });
installWatch(dev::eth::LogFilter().address(c_config).topic(0, (u256)1), [=](LocalisedLogEntries const&){ installCurrenciesWatch(); });
installWatch(dev::eth::ChainChangedFilter, [=](LocalisedLogEntries const&){ onNewBlock(); });
}
void Main::installNameRegWatch()
{
ethereum()->uninstallWatch(m_nameRegFilter);
m_nameRegFilter = installWatch(dev::eth::LogFilter().topic(ethereum()->stateAt(c_config, 0)), [=](LocalisedLogEntries const&){ onNameRegChange(); });
m_nameRegFilter = installWatch(dev::eth::LogFilter().address(u160(ethereum()->stateAt(c_config, 0))), [=](LocalisedLogEntries const&){ onNameRegChange(); });
}
void Main::installCurrenciesWatch()
{
ethereum()->uninstallWatch(m_currenciesFilter);
m_currenciesFilter = installWatch(dev::eth::LogFilter().topic(ethereum()->stateAt(c_config, 1)), [=](LocalisedLogEntries const&){ onCurrenciesChange(); });
m_currenciesFilter = installWatch(dev::eth::LogFilter().address(u160(ethereum()->stateAt(c_config, 1))), [=](LocalisedLogEntries const&){ onCurrenciesChange(); });
}
void Main::installBalancesWatch()
@ -224,7 +224,7 @@ void Main::installBalancesWatch()
altCoins.push_back(right160(ethereum()->stateAt(coinsAddr, i + 1)));
for (auto i: m_myKeys)
for (auto c: altCoins)
tf.address(c).topic((u256)(u160)i.address());
tf.address(c).topic(0, (u256)(u160)i.address());
ethereum()->uninstallWatch(m_balancesFilter);
m_balancesFilter = installWatch(tf, [=](LocalisedLogEntries const&){ onBalancesChange(); });

Loading…
Cancel
Save