From d367199553545d5e93052a8704f39d3054331bbb Mon Sep 17 00:00:00 2001 From: Janus Date: Tue, 4 Sep 2018 16:57:07 +0200 Subject: [PATCH] async block headers: remove BlockHeaderInterface and Conn classes, make self.height a local --- electrum/interface.py | 208 +++++++++++++++------------------ electrum/tests/test_network.py | 132 ++++++++++----------- 2 files changed, 158 insertions(+), 182 deletions(-) diff --git a/electrum/interface.py b/electrum/interface.py index 59a87b579..763bc07d3 100644 --- a/electrum/interface.py +++ b/electrum/interface.py @@ -171,6 +171,13 @@ class Interface(PrintError): except ValueError: return None + async def get_block_header(self, height, assert_mode): + res = await asyncio.wait_for(self.session.send_request('blockchain.block.header', [height]), 1) + return blockchain.deserialize_header(bytes.fromhex(res), height) + + async def request_chunk(self, idx, tip): + return await self.network.request_chunk(idx, tip, self.session) + async def open_session(self, sslc, exit_early): header_queue = asyncio.Queue() async with NotificationSession(None, header_queue, self.host, self.port, ssl=sslc, proxy=self.proxy) as session: @@ -184,8 +191,7 @@ class Interface(PrintError): self.mark_ready() self.session = session copy_header_queue = asyncio.Queue() - conniface = Conn(self.server, session, lambda idx, tip: self.network.request_chunk(idx, tip, session)) - block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue, conniface)) + block_retriever = asyncio.get_event_loop().create_task(self.run_fetch_blocks(subscription_res, copy_header_queue)) while True: try: new_header = await asyncio.wait_for(header_queue.get(), 300) @@ -195,121 +201,109 @@ class Interface(PrintError): except concurrent.futures.TimeoutError: await asyncio.wait_for(session.send_request('server.ping'), 5) - def queue_request(self, method, params, msg_id): - pass - def close(self): self.fut.cancel() @aiosafe - async def run_fetch_blocks(self, sub_reply, replies, conniface): + async def run_fetch_blocks(self, sub_reply, replies): async with self.network.bhi_lock: - bhi = BlockHeaderInterface(conniface, self.blockchain.height()+1, self) + height = self.blockchain.height()+1 await replies.put(blockchain.deserialize_header(bfh(sub_reply['hex']), sub_reply['height'])) while True: self.network.notify('updated') item = await replies.get() async with self.network.bhi_lock: - if self.blockchain.height()-1 < item['block_height']: - await bhi.sync_until() - if self.blockchain.height() >= bhi.height and self.blockchain.check_header(item): + if self.blockchain.height() < item['block_height']-1: + _, height = await self.sync_until(height, None) + if self.blockchain.height() >= height and self.blockchain.check_header(item): # another interface amended the blockchain - self.print_error("SKIPPING HEADER", bhi.height) + self.print_error("SKIPPING HEADER", height) continue - if self.tip < bhi.height: - bhi.height = self.tip - await bhi.step(item) - self.tip = max(bhi.height, self.tip) - -class BlockHeaderInterface(PrintError): - def __init__(self, conn, height, iface): - self.height = height - self.conn = conn - self.iface = iface - - def diagnostic_name(self): - return self.conn.server + if self.tip < height: + height = self.tip + _, height = await self.step(height, item) + self.tip = max(height, self.tip) - async def sync_until(self, next_height=None): + async def sync_until(self, height, next_height=None): if next_height is None: - next_height = self.iface.tip + next_height = self.tip last = None - while last is None or self.height < next_height: - if next_height > self.height + 10: - could_connect, num_headers = await self.conn.request_chunk(self.height, next_height) - self.iface.tip = max(self.height + num_headers, self.iface.tip) + while last is None or height < next_height: + if next_height > height + 10: + could_connect, num_headers = await self.request_chunk(height, next_height) + self.tip = max(height + num_headers, self.tip) if not could_connect: - if self.height <= self.iface.network.max_checkpoint(): + if height <= self.network.max_checkpoint(): raise Exception('server chain conflicts with checkpoints or genesis') - last = await self.step() - self.iface.tip = max(self.height, self.iface.tip) + last, height = await self.step(height) + self.tip = max(height, self.tip) continue - self.height = (self.height // 2016 * 2016) + num_headers - if self.height > next_height: - assert False, (self.height, self.iface.tip) + height = (height // 2016 * 2016) + num_headers + if height > next_height: + assert False, (height, self.tip) last = 'catchup' else: - last = await self.step() - self.iface.tip = max(self.height, self.iface.tip) - return last + last, height = await self.step(height) + self.tip = max(height, self.tip) + return last, height - async def step(self, header=None): - assert self.height != 0 + async def step(self, height, header=None): + assert height != 0 if header is None: - header = await self.conn.get_block_header(self.height, 'catchup') - chain = self.iface.blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) - if chain: return 'catchup' - can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self) + header = await self.get_block_header(height, 'catchup') + chain = self.blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) + if chain: return 'catchup', height + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) bad_header = None if not can_connect: - self.print_error("can't connect", self.height) + self.print_error("can't connect", height) #backward - bad = self.height + bad = height bad_header = header - self.height -= 1 + height -= 1 checkp = False - if self.height <= self.iface.network.max_checkpoint(): - self.height = self.iface.network.max_checkpoint() + 1 + if height <= self.network.max_checkpoint(): + height = self.network.max_checkpoint() + 1 checkp = True - header = await self.conn.get_block_header(self.height, 'backward') + header = await self.get_block_header(height, 'backward') chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) - can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) if checkp: assert can_connect or chain, (can_connect, chain) while not chain and not can_connect: - bad = self.height + bad = height bad_header = header - delta = self.iface.tip - self.height - next_height = self.iface.tip - 2 * delta + delta = self.tip - height + next_height = self.tip - 2 * delta checkp = False - if next_height <= self.iface.network.max_checkpoint(): - next_height = self.iface.network.max_checkpoint() + 1 + if next_height <= self.network.max_checkpoint(): + next_height = self.network.max_checkpoint() + 1 checkp = True - self.height = next_height + height = next_height - header = await self.conn.get_block_header(self.height, 'backward') + header = await self.get_block_header(height, 'backward') chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) - can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](self) + can_connect = blockchain.can_connect(header) if 'mock' not in header else header['mock']['connect'](height) if checkp: assert can_connect or chain, (can_connect, chain) - self.print_error("exiting backward mode at", self.height) + self.print_error("exiting backward mode at", height) if can_connect: - self.print_error("could connect", self.height) + self.print_error("could connect", height) chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) if type(can_connect) is bool: # mock - self.height += 1 - if self.height > self.iface.tip: + height += 1 + if height > self.tip: assert False - return 'catchup' - self.iface.blockchain = can_connect - self.height += 1 - self.iface.blockchain.save_header(header) - return 'catchup' + return 'catchup', height + self.blockchain = can_connect + height += 1 + self.blockchain.save_header(header) + return 'catchup', height if not chain: raise Exception("not chain") # line 931 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py @@ -318,27 +312,27 @@ class BlockHeaderInterface(PrintError): if type(chain) in [int, bool]: pass # mock else: - self.iface.blockchain = chain - good = self.height - self.height = (bad + good) // 2 - header = await self.conn.get_block_header(self.height, 'binary') + self.blockchain = chain + good = height + height = (bad + good) // 2 + header = await self.get_block_header(height, 'binary') while True: self.print_error("binary step") chain = blockchain.check_header(header) if 'mock' not in header else header['mock']['check'](header) if chain: - assert bad != self.height, (bad, self.height) - good = self.height - self.iface.blockchain = self.iface.blockchain if type(chain) in [bool, int] else chain + assert bad != height, (bad, height) + good = height + self.blockchain = self.blockchain if type(chain) in [bool, int] else chain else: - bad = self.height - assert good != self.height + bad = height + assert good != height bad_header = header if bad != good + 1: - self.height = (bad + good) // 2 - header = await self.conn.get_block_header(self.height, 'binary') + height = (bad + good) // 2 + header = await self.get_block_header(height, 'binary') continue - mock = bad_header and 'mock' in bad_header and bad_header['mock']['connect'](self) - real = not mock and self.iface.blockchain.can_connect(bad_header, check_height=False) + mock = bad_header and 'mock' in bad_header and bad_header['mock']['connect'](height) + real = not mock and self.blockchain.can_connect(bad_header, check_height=False) if not real and not mock: raise Exception('unexpected bad header during binary' + str(bad_header)) # line 948 in 8e69174374aee87d73cd2f8005fbbe87c93eee9c's network.py branch = blockchain.blockchains.get(bad) @@ -354,58 +348,48 @@ class BlockHeaderInterface(PrintError): # electrum runtime if ismocking and branch['check'](bad_header) or not ismocking and branch.check_header(bad_header): self.print_error('joining chain', bad) - self.height += 1 - return 'join' + height += 1 + return 'join', height else: if ismocking and branch['parent']['check'](header) or not ismocking and branch.parent().check_header(header): - self.print_error('reorg', bad, self.iface.tip) - self.iface.blockchain = branch.parent() if not ismocking else branch['parent'] - self.height = bad - header = await self.conn.get_block_header(self.height, 'binary') + self.print_error('reorg', bad, self.tip) + self.blockchain = branch.parent() if not ismocking else branch['parent'] + height = bad + header = await self.get_block_header(height, 'binary') else: if ismocking: - self.height = bad + 1 + height = bad + 1 self.print_error("TODO replace blockchain") - return 'conflict' + return 'conflict', height self.print_error('forkpoint conflicts with existing fork', branch.path()) branch.write(b'', 0) branch.save_header(bad_header) - self.iface.blockchain = branch - self.height = bad + 1 - return 'conflict' + self.blockchain = branch + height = bad + 1 + return 'conflict', height else: - bh = self.iface.blockchain.height() + bh = self.blockchain.height() if bh > good: - forkfun = self.iface.blockchain.fork + forkfun = self.blockchain.fork if 'mock' in bad_header: chain = bad_header['mock']['check'](bad_header) forkfun = bad_header['mock']['fork'] if 'fork' in bad_header['mock'] else forkfun else: - chain = self.iface.blockchain.check_header(bad_header) + chain = self.blockchain.check_header(bad_header) if not chain: b = forkfun(bad_header) assert bad not in blockchain.blockchains, (bad, list(blockchain.blockchains.keys())) blockchain.blockchains[bad] = b - self.iface.blockchain = b - self.height = b.forkpoint + 1 + self.blockchain = b + height = b.forkpoint + 1 assert b.forkpoint == bad - return 'fork' + return 'fork', height else: assert bh == good - if bh < self.iface.tip: + if bh < self.tip: self.print_error("catching up from %d"% (bh + 1)) - self.height = bh + 1 - return 'no_fork' - -class Conn: - def __init__(self, server, session, get_chunk): - self.server = server - self.session = session # type: aiorpcx.ClientSession - self.request_chunk = get_chunk - async def get_block_header(self, height, assert_mode): - res = await asyncio.wait_for(self.session.send_request('blockchain.block.header', [height]), 1) - return blockchain.deserialize_header(bytes.fromhex(res), height) - + height = bh + 1 + return 'no_fork', height def check_cert(host, cert): try: diff --git a/electrum/tests/test_network.py b/electrum/tests/test_network.py index 39781e64d..07b5996c9 100644 --- a/electrum/tests/test_network.py +++ b/electrum/tests/test_network.py @@ -5,12 +5,18 @@ import unittest from electrum.constants import set_regtest from electrum.simple_config import SimpleConfig from electrum import blockchain -from electrum.interface import BlockHeaderInterface +from electrum.interface import Interface -class MockConnection: - def __init__(self): +class MockInterface(Interface): + def __init__(self, config): + self.config = config + super().__init__(None, 'mock-server:50000:t', self.config.electrum_path(), None) + class FakeNetwork: + max_checkpoint = lambda: 0 + self.network = FakeNetwork self.q = asyncio.Queue() - self.server = 'mock-server' + self.blockchain = blockchain.Blockchain(self.config, 2002, None) + self.tip = 12 async def get_block_header(self, height, assert_mode): assert self.q.qsize() > 0, (height, assert_mode) item = await self.q.get() @@ -22,76 +28,63 @@ class MockConnection: class TestNetwork(unittest.TestCase): def setUp(self): self.config = SimpleConfig({'electrum_path': tempfile.mkdtemp(prefix="test_network")}) - - def blockchain_iface_pair(self, forkpoint=2002): - b = blockchain.Blockchain(self.config, forkpoint, None) - class FakeNetwork: - max_checkpoint = lambda: 0 - class FakeIface: - blockchain = b - network = FakeNetwork - tip = 12 - return FakeIface + self.interface = MockInterface(self.config) def test_new_fork(self): blockchain.blockchains = {} - conn = MockConnection() - conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - def mock_connect(block_header_iface): - return block_header_iface.height == 6 - conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1,'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) - conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1,'check':lambda x: True, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) - conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) - conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) - ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair()) - self.assertEqual('fork', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8))) - self.assertEqual(conn.q.qsize(), 0) + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(height): + return height == 6 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1,'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1,'check':lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=8))) + self.assertEqual(self.interface.q.qsize(), 0) def test_new_can_connect_during_backward(self): blockchain.blockchains = {} - conn = MockConnection() - conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - def mock_connect(block_header_iface): - return block_header_iface.height == 2 - conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) - conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) - conn.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) - conn.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) - ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair()) - self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=5))) - self.assertEqual(conn.q.qsize(), 0) + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + def mock_connect(height): + return height == 2 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect, 'fork': self.mock_fork}}) + self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=5))) + self.assertEqual(self.interface.q.qsize(), 0) def mock_fork(self, bad_header): return blockchain.Blockchain(self.config, bad_header['block_height'], None) def test_new_chain_false_during_binary(self): blockchain.blockchains = {} - conn = MockConnection() - conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - mock_connect = lambda bhi: bhi.height == 3 - conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect}}) - conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': mock_connect}}) - conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: False, 'fork': self.mock_fork, 'connect': mock_connect}}) - conn.q.put_nowait({'block_height': 3, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) - conn.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) - conn.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) - ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair(1000)) - self.assertEqual('catchup', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7))) - self.assertEqual(conn.q.qsize(), 0) + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + mock_connect = lambda height: height == 3 + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': mock_connect}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': mock_connect}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: False, 'fork': self.mock_fork, 'connect': mock_connect}}) + self.interface.q.put_nowait({'block_height': 3, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + self.assertEqual(self.interface.q.qsize(), 0) def test_new_join(self): blockchain.blockchains = {7: {'check': lambda bad_header: True}} - conn = MockConnection() - conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda x: x.height == 6}}) - conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) - ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair()) - self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=7))) - self.assertEqual(conn.q.qsize(), 0) + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda height: height == 6}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: True, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('join', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7))) + self.assertEqual(self.interface.q.qsize(), 0) def test_new_reorg(self): times = 0 @@ -101,17 +94,16 @@ class TestNetwork(unittest.TestCase): times += 1 return times != 1 blockchain.blockchains = {7: {'check': check, 'parent': {'check': lambda x: True}}} - conn = MockConnection() - conn.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda x: x.height == 6}}) - conn.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: 1, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) - conn.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}}) - conn.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}}) - ifa = BlockHeaderInterface(conn, 8, self.blockchain_iface_pair()) - self.assertEqual('join', asyncio.get_event_loop().run_until_complete(ifa.sync_until(next_height=8))) - self.assertEqual(conn.q.qsize(), 0) + self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 7, 'mock': {'backward':1, 'check': lambda x: False, 'connect': lambda height: height == 6}}) + self.interface.q.put_nowait({'block_height': 2, 'mock': {'backward':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 4, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: False}}) + self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1, 'check': lambda x: 1, 'connect': lambda x: True}}) + self.interface.q.put_nowait({'block_height': 7, 'mock': {'binary':1, 'check': lambda x: False, 'connect': lambda x: True}}) + ifa = self.interface + self.assertEqual(('join', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=8))) + self.assertEqual(self.interface.q.qsize(), 0) self.assertEqual(times, 2) if __name__=="__main__":