Browse Source

migration to log filters, not finished

cl-refactor
Marek Kotewicz 10 years ago
parent
commit
3a1b83538b
  1. 17
      alethzero/MainWin.cpp
  2. 1
      alethzero/MainWin.h
  3. 92
      libethereum/Client.cpp
  4. 14
      libethereum/Client.h
  5. 20
      libethereum/Interface.h
  6. 6
      libethereum/State.cpp
  7. 2
      libethereum/State.h
  8. 100
      libweb3jsonrpc/WebThreeStubServer.cpp

17
alethzero/MainWin.cpp

@ -241,13 +241,22 @@ void Main::onKeysChanged()
installBalancesWatch();
}
unsigned Main::installWatch(dev::eth::MessageFilter const& _tf, std::function<void()> const& _f)
unsigned Main::installWatch(dev::eth::LogFilter const& _tf, std::function<void()> const& _f)
{
auto ret = ethereum()->installWatch(_tf);
m_handlers[ret] = _f;
return ret;
}
unsigned Main::installWatch(dev::eth::MessageFilter const& _tf, std::function<void()> const& _f)
{
// auto ret = ethereum()->installWatch(_tf);
// m_handlers[ret] = _f;
// return ret;
}
unsigned Main::installWatch(dev::h256 _tf, std::function<void()> const& _f)
{
auto ret = ethereum()->installWatch(_tf);
@ -263,8 +272,10 @@ void Main::uninstallWatch(unsigned _w)
void Main::installWatches()
{
installWatch(dev::eth::MessageFilter().altered(c_config, 0), [=](){ installNameRegWatch(); });
installWatch(dev::eth::MessageFilter().altered(c_config, 1), [=](){ installCurrenciesWatch(); });
installWatch(dev::eth::LogFilter().address(c_config), [=]() { installNameRegWatch(); });
installWatch(dev::eth::LogFilter().address(c_config), [=]() { installCurrenciesWatch(); });
// installWatch(dev::eth::MessageFilter().altered(c_config, 0), [=](){ installNameRegWatch(); });
// installWatch(dev::eth::MessageFilter().altered(c_config, 1), [=](){ installCurrenciesWatch(); });
installWatch(dev::eth::PendingChangedFilter, [=](){ onNewPending(); });
installWatch(dev::eth::ChainChangedFilter, [=](){ onNewBlock(); });
}

1
alethzero/MainWin.h

@ -189,6 +189,7 @@ private:
dev::u256 value() const;
dev::u256 gasPrice() const;
unsigned installWatch(dev::eth::LogFilter const& _tf, std::function<void()> const& _f);
unsigned installWatch(dev::eth::MessageFilter const& _tf, std::function<void()> const& _f);
unsigned installWatch(dev::h256 _tf, std::function<void()> const& _f);
void uninstallWatch(unsigned _w);

92
libethereum/Client.cpp

@ -159,7 +159,7 @@ void Client::clearPending()
if (!m_postMine.pending().size())
return;
for (unsigned i = 0; i < m_postMine.pending().size(); ++i)
appendFromNewPending(m_postMine.oldBloom(i), changeds);
appendFromNewPending(m_postMine.logBloom(i), changeds);
changeds.insert(PendingChangedFilter);
m_postMine = m_preMine;
}
@ -181,7 +181,7 @@ unsigned Client::installWatch(h256 _h)
return ret;
}
unsigned Client::installWatch(MessageFilter const& _f)
unsigned Client::installWatch(LogFilter const& _f)
{
lock_guard<mutex> l(m_filterLock);
@ -222,7 +222,7 @@ void Client::noteChanged(h256Set const& _filters)
}
}
void Client::appendFromNewPending(h256 _bloom, h256Set& o_changed) const
void Client::appendFromNewPending(LogBloom _bloom, h256Set& o_changed) const
{
lock_guard<mutex> l(m_filterLock);
for (pair<h256, InstalledFilter> const& i: m_filters)
@ -232,12 +232,19 @@ void Client::appendFromNewPending(h256 _bloom, h256Set& o_changed) const
void Client::appendFromNewBlock(h256 _block, h256Set& o_changed) const
{
auto d = m_bc.details(_block);
// auto d = m_bc.details(_block);
auto d = m_bc.logBlooms(_block);
lock_guard<mutex> l(m_filterLock);
for (pair<h256, InstalledFilter> const& i: m_filters)
if ((unsigned)i.second.filter.latest() >= d.number && (unsigned)i.second.filter.earliest() <= d.number && i.second.filter.matches(d.bloom))
o_changed.insert(i.first);
for (auto b: d.blooms)
if (i.second.filter.matches(b))
{
o_changed.insert(i.first);
break;
}
// if ((unsigned)i.second.filter.latest() >= d.number && (unsigned)i.second.filter.earliest() <= d.number && i.second.filter.matches(d.bloom))
// o_changed.insert(i.first);
}
void Client::setForceMining(bool _enable)
@ -440,7 +447,7 @@ void Client::doWork()
// returns h256s as blooms, once for each transaction.
cwork << "postSTATE <== TQ";
h256s newPendingBlooms = m_postMine.sync(m_tq);
h512s newPendingBlooms = m_postMine.sync(m_tq);
if (newPendingBlooms.size())
{
for (auto i: newPendingBlooms)
@ -564,9 +571,9 @@ BlockInfo Client::uncle(h256 _blockHash, unsigned _i) const
return BlockInfo::fromHeader(b[2][_i].data());
}
PastMessages Client::messages(MessageFilter const& _f) const
LogEntries Client::logs(LogFilter const& _f) const
{
PastMessages ret;
LogEntries ret;
unsigned begin = min<unsigned>(m_bc.number(), (unsigned)_f.latest());
unsigned end = min(begin, (unsigned)_f.earliest());
unsigned m = _f.max();
@ -578,23 +585,22 @@ PastMessages Client::messages(MessageFilter const& _f) const
ReadGuard l(x_stateDB);
for (unsigned i = 0; i < m_postMine.pending().size(); ++i)
{
// Might have a transaction that contains a matching message.
Manifest const& ms = m_postMine.changesFromPending(i);
PastMessages pm = _f.matches(ms, i);
if (pm.size())
// Might have a transaction that contains a matching log.
TransactionReceipt const& tr = m_postMine.receipt(i);
LogEntries le = _f.matches(tr);
if (le.size())
{
auto ts = time(0);
for (unsigned j = 0; j < pm.size() && ret.size() != m; ++j)
for (unsigned j = 0; j < le.size() && ret.size() != m; ++j)
if (s)
s--;
else
// Have a transaction that contains a matching message.
ret.insert(ret.begin(), pm[j].polish(h256(), ts, m_bc.number() + 1, m_postMine.address()));
ret.insert(ret.begin(), le[j]);
}
}
}
#if ETH_DEBUG
// fill these params
unsigned skipped = 0;
unsigned falsePos = 0;
#endif
@ -602,50 +608,28 @@ PastMessages Client::messages(MessageFilter const& _f) const
unsigned n = begin;
for (; ret.size() != m && n != end; n--, h = m_bc.details(h).parent)
{
auto d = m_bc.details(h);
#if ETH_DEBUG
int total = 0;
#endif
if (_f.matches(d.bloom))
{
// Might have a block that contains a transaction that contains a matching message.
auto bs = m_bc.blooms(h).blooms;
Manifests ms;
BlockInfo bi;
for (unsigned i = 0; i < bs.size(); ++i)
if (_f.matches(bs[i]))
//? check block bloom
for (TransactionReceipt receipt: m_bc.receipts(h).receipts)
if (_f.matches(receipt.bloom()))
{
LogEntries le = _f.matches(receipt);
if (le.size())
{
// Might have a transaction that contains a matching message.
if (ms.empty())
ms = m_bc.traces(h).traces;
Manifest const& changes = ms[i];
PastMessages pm = _f.matches(changes, i);
if (pm.size())
{
#if ETH_DEBUG
total += pm.size();
total += le.size();
#endif
if (!bi)
bi.populate(m_bc.block(h));
auto ts = bi.timestamp;
auto cb = bi.coinbaseAddress;
for (unsigned j = 0; j < pm.size() && ret.size() != m; ++j)
if (s)
s--;
else
// Have a transaction that contains a matching message.
ret.push_back(pm[j].polish(h, ts, n, cb));
for (unsigned j = 0; j < le.size() && ret.size() != m; ++j)
{
if (s)
s--;
else
ret.insert(ret.begin(), le[j]);
}
}
#if ETH_DEBUG
if (!total)
falsePos++;
}
else
skipped++;
#else
}
#endif
}
if (n == end)
break;
}

14
libethereum/Client.h

@ -79,9 +79,11 @@ static const int GenesisBlock = INT_MIN;
struct InstalledFilter
{
InstalledFilter(MessageFilter const& _f): filter(_f) {}
// InstalledFilter(MessageFilter const& _f): filter(_f) {}
// MessageFilter filter;
InstalledFilter(LogFilter const& _f): filter(_f) {}
MessageFilter filter;
LogFilter filter;
unsigned refCount = 1;
};
@ -152,14 +154,14 @@ public:
virtual bytes codeAt(Address _a, int _block) const;
virtual std::map<u256, u256> storageAt(Address _a, int _block) const;
virtual unsigned installWatch(MessageFilter const& _filter);
virtual unsigned installWatch(LogFilter const& _filter);
virtual unsigned installWatch(h256 _filterId);
virtual void uninstallWatch(unsigned _watchId);
virtual bool peekWatch(unsigned _watchId) const { std::lock_guard<std::mutex> l(m_filterLock); try { return m_watches.at(_watchId).changes != 0; } catch (...) { return false; } }
virtual bool checkWatch(unsigned _watchId) { std::lock_guard<std::mutex> l(m_filterLock); bool ret = false; try { ret = m_watches.at(_watchId).changes != 0; m_watches.at(_watchId).changes = 0; } catch (...) {} return ret; }
virtual PastMessages messages(unsigned _watchId) const { try { std::lock_guard<std::mutex> l(m_filterLock); return messages(m_filters.at(m_watches.at(_watchId).id).filter); } catch (...) { return PastMessages(); } }
virtual PastMessages messages(MessageFilter const& _filter) const;
virtual LogEntries logs(unsigned _watchId) const { try { std::lock_guard<std::mutex> l(m_filterLock); return logs(m_filters.at(m_watches.at(_watchId).id).filter); } catch (...) { return LogEntries(); } }
virtual LogEntries logs(LogFilter const& _filter) const;
// [EXTRA API]:
@ -259,7 +261,7 @@ private:
/// Collate the changed filters for the bloom filter of the given pending transaction.
/// Insert any filters that are activated into @a o_changed.
void appendFromNewPending(h256 _pendingTransactionBloom, h256Set& o_changed) const;
void appendFromNewPending(LogBloom _pendingTransactionBloom, h256Set& o_changed) const;
/// Collate the changed filters for the hash of the given block.
/// Insert any filters that are activated into @a o_changed.

20
libethereum/Interface.h

@ -84,13 +84,18 @@ public:
virtual bytes codeAt(Address _a, int _block) const = 0;
virtual std::map<u256, u256> storageAt(Address _a, int _block) const = 0;
// [MESSAGE API]
virtual PastMessages messages(unsigned _watchId) const = 0;
virtual PastMessages messages(MessageFilter const& _filter) const = 0;
// // [MESSAGE API]
//
// virtual PastMessages messages(unsigned _watchId) const = 0;
// virtual PastMessages messages(MessageFilter const& _filter) const = 0;
// [LOGS API]
virtual LogEntries logs(unsigned _watchId) const = 0;
virtual LogEntries logs(LogFilter const& _filter) const = 0;
/// Install, uninstall and query watches.
virtual unsigned installWatch(MessageFilter const& _filter) = 0;
virtual unsigned installWatch(LogFilter const& _filter) = 0;
virtual unsigned installWatch(h256 _filterId) = 0;
virtual void uninstallWatch(unsigned _watchId) = 0;
virtual bool peekWatch(unsigned _watchId) const = 0;
@ -175,12 +180,13 @@ class Watch: public boost::noncopyable
public:
Watch() {}
Watch(Interface& _c, h256 _f): m_c(&_c), m_id(_c.installWatch(_f)) {}
Watch(Interface& _c, MessageFilter const& _tf): m_c(&_c), m_id(_c.installWatch(_tf)) {}
Watch(Interface& _c, LogFilter const& _tf): m_c(&_c), m_id(_c.installWatch(_tf)) {}
~Watch() { if (m_c) m_c->uninstallWatch(m_id); }
bool check() { return m_c ? m_c->checkWatch(m_id) : false; }
bool peek() { return m_c ? m_c->peekWatch(m_id) : false; }
PastMessages messages() const { return m_c->messages(m_id); }
// PastMessages messages() const { return m_c->messages(m_id); }
LogEntries logs() const { return m_c->logs(m_id); }
private:
Interface* m_c = nullptr;

6
libethereum/State.cpp

@ -534,10 +534,10 @@ bool State::cull(TransactionQueue& _tq) const
return ret;
}
h256s State::sync(TransactionQueue& _tq, bool* o_transactionQueueChanged)
h512s State::sync(TransactionQueue& _tq, bool* o_transactionQueueChanged)
{
// TRANSACTIONS
h256s ret;
h512s ret;
auto ts = _tq.transactions();
for (int goodTxs = 1; goodTxs;)
@ -552,7 +552,7 @@ h256s State::sync(TransactionQueue& _tq, bool* o_transactionQueueChanged)
uncommitToMine();
// boost::timer t;
execute(i.second);
ret.push_back(m_receipts.back().changes().bloom());
ret.push_back(m_receipts.back().bloom());
_tq.noteGood(i);
++goodTxs;
// cnote << "TX took:" << t.elapsed() * 1000;

2
libethereum/State.h

@ -147,7 +147,7 @@ public:
/// @returns a list of bloom filters one for each transaction placed from the queue into the state.
/// @a o_transactionQueueChanged boolean pointer, the value of which will be set to true if the transaction queue
/// changed and the pointer is non-null
h256s sync(TransactionQueue& _tq, bool* o_transactionQueueChanged = nullptr);
h512s sync(TransactionQueue& _tq, bool* o_transactionQueueChanged = nullptr);
/// Like sync but only operate on _tq, killing the invalid/old ones.
bool cull(TransactionQueue& _tq) const;

100
libweb3jsonrpc/WebThreeStubServer.cpp

@ -59,34 +59,6 @@ static Json::Value toJson(dev::eth::BlockInfo const& _bi)
return res;
}
static Json::Value toJson(dev::eth::PastMessage const& _t)
{
Json::Value res;
res["input"] = jsFromBinary(_t.input);
res["output"] = jsFromBinary(_t.output);
res["to"] = toJS(_t.to);
res["from"] = toJS(_t.from);
res["value"] = jsToDecimal(toJS(_t.value));
res["origin"] = toJS(_t.origin);
res["timestamp"] = toJS(_t.timestamp);
res["coinbase"] = toJS(_t.coinbase);
res["block"] = toJS(_t.block);
Json::Value path;
for (int i: _t.path)
path.append(i);
res["path"] = path;
res["number"] = (int)_t.number;
return res;
}
static Json::Value toJson(dev::eth::PastMessages const& _pms)
{
Json::Value res;
for (dev::eth::PastMessage const& t: _pms)
res.append(toJson(t));
return res;
}
static Json::Value toJson(dev::eth::Transaction const& _t)
{
Json::Value res;
@ -111,15 +83,7 @@ static Json::Value toJson(dev::eth::LogEntry const& _e)
return res;
}
static Json::Value toJson(std::map<u256, u256> const& _storage)
{
Json::Value res(Json::objectValue);
for (auto i: _storage)
res[toJS(i.first)] = toJS(i.second);
return res;
}
/*static*/ Json::Value toJson(dev::eth::LogEntries const& _es) // commented to avoid warning. Uncomment once in use @ poC-7.
static Json::Value toJson(dev::eth::LogEntries const& _es) // commented to avoid warning. Uncomment once in use @ poC-7.
{
Json::Value res;
for (dev::eth::LogEntry const& e: _es)
@ -127,59 +91,15 @@ static Json::Value toJson(std::map<u256, u256> const& _storage)
return res;
}
static dev::eth::MessageFilter toMessageFilter(Json::Value const& _json)
static Json::Value toJson(std::map<u256, u256> const& _storage)
{
dev::eth::MessageFilter filter;
if (!_json.isObject() || _json.empty())
return filter;
if (_json["earliest"].isInt())
filter.withEarliest(_json["earliest"].asInt());
if (_json["latest"].isInt())
filter.withLatest(_json["lastest"].asInt());
if (_json["max"].isInt())
filter.withMax(_json["max"].asInt());
if (_json["skip"].isInt())
filter.withSkip(_json["skip"].asInt());
if (!_json["from"].empty())
{
if (_json["from"].isArray())
{
for (auto i : _json["from"])
if (i.isString())
filter.from(jsToAddress(i.asString()));
}
else if (_json["from"].isString())
filter.from(jsToAddress(_json["from"].asString()));
}
if (!_json["to"].empty())
{
if (_json["to"].isArray())
{
for (auto i : _json["to"])
if (i.isString())
filter.to(jsToAddress(i.asString()));
}
else if (_json["to"].isString())
filter.to(jsToAddress(_json["to"].asString()));
}
if (!_json["altered"].empty())
{
if (_json["altered"].isArray())
for (auto i: _json["altered"])
if (i.isObject())
filter.altered(jsToAddress(i["id"].asString()), jsToU256(i["at"].asString()));
else
filter.altered((jsToAddress(i.asString())));
else if (_json["altered"].isObject())
filter.altered(jsToAddress(_json["altered"]["id"].asString()), jsToU256(_json["altered"]["at"].asString()));
else
filter.altered(jsToAddress(_json["altered"].asString()));
}
return filter;
Json::Value res(Json::objectValue);
for (auto i: _storage)
res[toJS(i.first)] = toJS(i.second);
return res;
}
/*static*/ dev::eth::LogFilter toLogFilter(Json::Value const& _json) // commented to avoid warning. Uncomment once in use @ PoC-7.
static dev::eth::LogFilter toLogFilter(Json::Value const& _json) // commented to avoid warning. Uncomment once in use @ PoC-7.
{
dev::eth::LogFilter filter;
if (!_json.isObject() || _json.empty())
@ -478,7 +398,8 @@ Json::Value WebThreeStubServer::eth_getMessages(int const& _id)
{
if (!client())
return Json::Value();
return toJson(client()->messages(_id));
// return toJson(client()->messages(_id));
return toJson(client()->logs(_id));
}
std::string WebThreeStubServer::db_getString(std::string const& _name, std::string const& _key)
@ -509,7 +430,8 @@ int WebThreeStubServer::eth_newFilter(Json::Value const& _json)
unsigned ret = -1;
if (!client())
return ret;
ret = client()->installWatch(toMessageFilter(_json));
// ret = client()->installWatch(toMessageFilter(_json));
ret = client()->installWatch(toLogFilter(_json));
return ret;
}

Loading…
Cancel
Save