From 2187615c08e5209a5eb649c367406b6eacfe0c13 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 31 Aug 2018 19:51:32 +0200 Subject: [PATCH] verifier: request proofs in batches --- electrum/network.py | 1 - electrum/verifier.py | 51 ++++++++++++++++++++++---------------------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/electrum/network.py b/electrum/network.py index 58ad60692..f0aa4a61f 100644 --- a/electrum/network.py +++ b/electrum/network.py @@ -719,7 +719,6 @@ class Network(PrintError): pass async def get_merkle_for_transaction(self, tx_hash, tx_height): - print("getting merkle for transaction", tx_hash, tx_height) return await self.interface.session.send_request('blockchain.transaction.get_merkle', [tx_hash, tx_height]) def broadcast_transaction(self, tx): diff --git a/electrum/verifier.py b/electrum/verifier.py index 61642364e..39ecf82af 100644 --- a/electrum/verifier.py +++ b/electrum/verifier.py @@ -24,6 +24,8 @@ import asyncio from typing import Sequence, Optional +from aiorpcx import TaskGroup + from .util import ThreadJob, bh2u, VerifiedTxInfo, aiosafe from .bitcoin import Hash, hash_decode, hash_encode from .transaction import Transaction @@ -49,10 +51,10 @@ class SPV(ThreadJob): @aiosafe async def main(self): while True: - await self.run() + await self._request_proofs() await asyncio.sleep(1) - async def run(self): + async def _request_proofs(self): blockchain = self.network.blockchain() if not blockchain: self.print_error("no blockchain") @@ -60,33 +62,30 @@ class SPV(ThreadJob): local_height = self.network.get_local_height() unverified = self.wallet.get_unverified_txs() - #print("verifier run", len(unverified)) - for tx_hash, tx_height in unverified.items(): - # do not request merkle branch before headers are available - if tx_height <= 0 or tx_height > local_height: - continue - - header = blockchain.read_header(tx_height) - if header is None: - index = tx_height // 2016 - if index < len(blockchain.checkpoints): - await self.network.request_chunk(tx_height, None) - elif (tx_hash not in self.requested_merkle - and tx_hash not in self.merkle_roots): - self.print_error('requested merkle', tx_hash) - self.requested_merkle.add(tx_hash) - self.verify_merkle(tx_hash, await self.network.get_merkle_for_transaction( - tx_hash, - tx_height - )) + + async with TaskGroup() as group: + for tx_hash, tx_height in unverified.items(): + # do not request merkle branch before headers are available + if tx_height <= 0 or tx_height > local_height: + continue + + header = blockchain.read_header(tx_height) + if header is None: + index = tx_height // 2016 + if index < len(blockchain.checkpoints): + await group.spawn(self.network.request_chunk, tx_height, None) + elif (tx_hash not in self.requested_merkle + and tx_hash not in self.merkle_roots): + self.print_error('requested merkle', tx_hash) + self.requested_merkle.add(tx_hash) + await group.spawn(self._request_and_verify_single_proof, tx_hash, tx_height) if self.network.blockchain() != self.blockchain: self.blockchain = self.network.blockchain() - self.undo_verifications() + self._undo_verifications() - def verify_merkle(self, tx_hash, merkle): - if self.wallet.verifier is None: - return # we have been killed, this was just an orphan callback + async def _request_and_verify_single_proof(self, tx_hash, tx_height): + merkle = await self.network.get_merkle_for_transaction(tx_hash, tx_height) # Verify the hash of the server-provided merkle branch to a # transaction matches the merkle root of its block tx_height = merkle.get('block_height') @@ -143,7 +142,7 @@ class SPV(ThreadJob): else: raise InnerNodeOfSpvProofIsValidTx() - def undo_verifications(self): + def _undo_verifications(self): height = self.blockchain.get_forkpoint() tx_hashes = self.wallet.undo_verifications(self.blockchain, height) for tx_hash in tx_hashes: