From ed7d8a319d779dd1fff4305d58c4404e37a56407 Mon Sep 17 00:00:00 2001 From: "John L. Jegutanis" Date: Tue, 14 Mar 2017 02:06:54 +0200 Subject: [PATCH] Refactor block parsing API --- lib/coins.py | 50 ++++++++++++++++++++------------------- lib/tx.py | 5 ++-- server/block_processor.py | 19 ++++++++------- server/db.py | 25 +++++++++++++------- 4 files changed, 56 insertions(+), 43 deletions(-) diff --git a/lib/coins.py b/lib/coins.py index 3971407..d6d4596 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -30,6 +30,7 @@ Anything coin-specific should go in this file and be subclassed where necessary for appropriate handling. ''' +from collections import namedtuple import re import struct from decimal import Decimal @@ -40,6 +41,8 @@ from lib.hash import Base58, hash160, double_sha256, hash_to_str from lib.script import ScriptPubKey from lib.tx import Deserializer, DeserializerSegWit +Block = namedtuple("Block", "header transactions") + class CoinError(Exception): '''Exception raised for coin-related errors.''' @@ -53,6 +56,8 @@ class Coin(object): RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?') VALUE_PER_COIN = 100000000 CHUNK_SIZE = 2016 + BASIC_HEADER_SIZE = 80 + STATIC_BLOCK_HEADERS = True IRC_PREFIX = None IRC_SERVER = "irc.freenode.net" IRC_PORT = 6667 @@ -232,29 +237,33 @@ class Coin(object): return header[4:36] @classmethod - def header_offset(cls, height): + def static_header_offset(cls, height): '''Given a header height return its offset in the headers file. If header sizes change at some point, this is the only code that needs updating.''' - return height * 80 + assert cls.STATIC_BLOCK_HEADERS + return height * cls.BASIC_HEADER_SIZE @classmethod - def header_len(cls, height): + def static_header_len(cls, height): '''Given a header height return its length.''' - return cls.header_offset(height + 1) - cls.header_offset(height) + return cls.static_header_offset(height + 1) \ + - cls.static_header_offset(height) @classmethod def block_header(cls, block, height): '''Returns the block header given a block and its height.''' - return block[:cls.header_len(height)] + return block[:cls.static_header_len(height)] @classmethod - def block_txs(cls, block, height): - '''Returns a list of (deserialized_tx, tx_hash) pairs given a + def block_full(cls, block, height): + '''Returns (header, [(deserialized_tx, tx_hash), ...]) given a block and its height.''' + header = cls.block_header(block, height) deserializer = cls.deserializer() - return deserializer(block[cls.header_len(height):]).read_block() + txs = deserializer(block[len(header):]).read_tx_block() + return Block(header, txs) @classmethod def decimal_value(cls, value): @@ -635,8 +644,9 @@ class FairCoin(Coin): P2PKH_VERBYTE = bytes.fromhex("5f") P2SH_VERBYTE = bytes.fromhex("24") WIF_BYTE = bytes.fromhex("df") - GENESIS_HASH=('1f701f2b8de1339dc0ec908f3fb6e9b0' - 'b870b6f20ba893e120427e42bbc048d7') + GENESIS_HASH = ('1f701f2b8de1339dc0ec908f3fb6e9b0' + 'b870b6f20ba893e120427e42bbc048d7') + BASIC_HEADER_SIZE = 108 TX_COUNT = 1000 TX_COUNT_HEIGHT = 1000 TX_PER_BLOCK = 1 @@ -650,22 +660,14 @@ class FairCoin(Coin): ] @classmethod - def header_offset(cls, height): - '''Given a header height return its offset in the headers file. - If header sizes change at some point, this is the only code - that needs updating.''' - return height * 108 - - @classmethod - def block_txs(cls, block, height): - '''Returns a list of (deserialized_tx, tx_hash) pairs given a + def block_full(cls, block, height): + '''Returns (header, [(deserialized_tx, tx_hash), ...]) given a block and its height.''' - if height == 0: - return [] - - deserializer = cls.deserializer() - return deserializer(block[cls.header_len(height):]).read_block() + if height > 0: + return cls.block_full(block, height) + else: + return Block(cls.block_header(block, height), []) @classmethod def electrum_header(cls, header, height): diff --git a/lib/tx.py b/lib/tx.py index 9869512..d84e8af 100644 --- a/lib/tx.py +++ b/lib/tx.py @@ -1,4 +1,5 @@ # Copyright (c) 2016-2017, Neil Booth +# Copyright (c) 2017, the ElectrumX authors # # All rights reserved. # @@ -105,10 +106,10 @@ class Deserializer(object): self._read_le_uint32() # locktime ), double_sha256(self.binary[start:self.cursor]) - def read_block(self): + def read_tx_block(self): '''Returns a list of (deserialized_tx, tx_hash) pairs.''' read_tx = self.read_tx - txs = [read_tx() for n in range(self._read_varint())] + txs = [read_tx() for _ in range(self._read_varint())] # Some coins have excess data beyond the end of the transactions return txs diff --git a/server/block_processor.py b/server/block_processor.py index 933cbae..1d73df2 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -1,4 +1,5 @@ # Copyright (c) 2016-2017, Neil Booth +# Copyright (c) 2017, the ElectrumX authors # # All rights reserved. # @@ -231,15 +232,15 @@ class BlockProcessor(server.db.DB): .format(len(blocks), first, self.height + 1)) return - headers = [self.coin.block_header(block, first + n) - for n, block in enumerate(blocks)] + blocks = [self.coin.block_full(block, first + n) + for n, block in enumerate(blocks)] + headers = [b.header for b in blocks] hprevs = [self.coin.header_prevhash(h) for h in headers] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] if hprevs == chain: start = time.time() - await self.controller.run_in_executor(self.advance_blocks, - blocks, headers) + await self.controller.run_in_executor(self.advance_blocks, blocks) if not self.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s' @@ -479,18 +480,18 @@ class BlockProcessor(server.db.DB): if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: self.flush(utxo_MB >= self.cache_MB * 4 // 5) - def advance_blocks(self, blocks, headers): + def advance_blocks(self, blocks): '''Synchronously advance the blocks. It is already verified they correctly connect onto our tip. ''' - block_txs = self.coin.block_txs + headers = [block.header for block in blocks] min_height = self.min_undo_height(self.daemon.cached_height()) height = self.height for block in blocks: height += 1 - undo_info = self.advance_txs(block_txs(block, height)) + undo_info = self.advance_txs(block.transactions) if height >= min_height: self.undo_infos.append((undo_info, height)) @@ -568,14 +569,14 @@ class BlockProcessor(server.db.DB): coin = self.coin for block in blocks: # Check and update self.tip - header = coin.block_header(block, self.height) + header, txs = coin.block_full(block, self.height) header_hash = coin.header_hash(header) if header_hash != self.tip: raise ChainError('backup block {} not tip {} at height {:,d}' .format(hash_to_str(header_hash), hash_to_str(self.tip), self.height)) self.tip = coin.header_prevhash(header) - self.backup_txs(coin.block_txs(block, self.height)) + self.backup_txs(txs) self.height -= 1 self.tx_counts.pop() diff --git a/server/db.py b/server/db.py index 6a3d4ef..2d65ea2 100644 --- a/server/db.py +++ b/server/db.py @@ -1,4 +1,5 @@ # Copyright (c) 2016, Neil Booth +# Copyright (c) 2017, the ElectrumX authors # # All rights reserved. # @@ -44,6 +45,13 @@ class DB(util.LoggedClass): self.env = env self.coin = env.coin + # Setup block header size handlers + if self.coin.STATIC_BLOCK_HEADERS: + self.header_offset = self.coin.static_header_offset + self.header_len = self.coin.static_header_len + else: + raise Exception("Non static headers are not supported") + self.logger.info('switching current directory to {}' .format(env.db_dir)) os.chdir(env.db_dir) @@ -191,24 +199,25 @@ class DB(util.LoggedClass): updated. These arrays are all append only, so in a crash we just pick up again from the DB height. ''' - blocks_done = len(self.headers) + blocks_done = len(headers) + height_start = fs_height + 1 new_height = fs_height + blocks_done prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count - assert len(self.tx_hashes) == blocks_done + assert len(block_tx_hashes) == blocks_done assert len(self.tx_counts) == new_height + 1 hashes = b''.join(block_tx_hashes) assert len(hashes) % 32 == 0 assert len(hashes) // 32 == txs_done # Write the headers, tx counts, and tx hashes - offset = self.coin.header_offset(fs_height + 1) + offset = self.header_offset(height_start) self.headers_file.write(offset, b''.join(headers)) - offset = (fs_height + 1) * self.tx_counts.itemsize + offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, - self.tx_counts[fs_height + 1:].tobytes()) + self.tx_counts[height_start:].tobytes()) offset = prior_tx_count * 32 self.hashes_file.write(offset, hashes) @@ -220,8 +229,8 @@ class DB(util.LoggedClass): raise self.DBError('{:,d} headers starting at {:,d} not on disk' .format(count, start)) if disk_count: - offset = self.coin.header_offset(start) - size = self.coin.header_offset(start + disk_count) - offset + offset = self.header_offset(start) + size = self.header_offset(start + disk_count) - offset return self.headers_file.read(offset, size) return b'' @@ -241,7 +250,7 @@ class DB(util.LoggedClass): offset = 0 headers = [] for n in range(count): - hlen = self.coin.header_len(height + n) + hlen = self.header_len(height + n) headers.append(headers_concat[offset:offset + hlen]) offset += hlen