Browse Source

Refactor block parsing API

master
John L. Jegutanis 8 years ago
committed by Neil Booth
parent
commit
ed7d8a319d
  1. 48
      lib/coins.py
  2. 5
      lib/tx.py
  3. 17
      server/block_processor.py
  4. 25
      server/db.py

48
lib/coins.py

@ -30,6 +30,7 @@ Anything coin-specific should go in this file and be subclassed where
necessary for appropriate handling. necessary for appropriate handling.
''' '''
from collections import namedtuple
import re import re
import struct import struct
from decimal import Decimal from decimal import Decimal
@ -40,6 +41,8 @@ from lib.hash import Base58, hash160, double_sha256, hash_to_str
from lib.script import ScriptPubKey from lib.script import ScriptPubKey
from lib.tx import Deserializer, DeserializerSegWit from lib.tx import Deserializer, DeserializerSegWit
Block = namedtuple("Block", "header transactions")
class CoinError(Exception): class CoinError(Exception):
'''Exception raised for coin-related errors.''' '''Exception raised for coin-related errors.'''
@ -53,6 +56,8 @@ class Coin(object):
RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?') RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?')
VALUE_PER_COIN = 100000000 VALUE_PER_COIN = 100000000
CHUNK_SIZE = 2016 CHUNK_SIZE = 2016
BASIC_HEADER_SIZE = 80
STATIC_BLOCK_HEADERS = True
IRC_PREFIX = None IRC_PREFIX = None
IRC_SERVER = "irc.freenode.net" IRC_SERVER = "irc.freenode.net"
IRC_PORT = 6667 IRC_PORT = 6667
@ -232,29 +237,33 @@ class Coin(object):
return header[4:36] return header[4:36]
@classmethod @classmethod
def header_offset(cls, height): def static_header_offset(cls, height):
'''Given a header height return its offset in the headers file. '''Given a header height return its offset in the headers file.
If header sizes change at some point, this is the only code If header sizes change at some point, this is the only code
that needs updating.''' that needs updating.'''
return height * 80 assert cls.STATIC_BLOCK_HEADERS
return height * cls.BASIC_HEADER_SIZE
@classmethod @classmethod
def header_len(cls, height): def static_header_len(cls, height):
'''Given a header height return its length.''' '''Given a header height return its length.'''
return cls.header_offset(height + 1) - cls.header_offset(height) return cls.static_header_offset(height + 1) \
- cls.static_header_offset(height)
@classmethod @classmethod
def block_header(cls, block, height): def block_header(cls, block, height):
'''Returns the block header given a block and its height.''' '''Returns the block header given a block and its height.'''
return block[:cls.header_len(height)] return block[:cls.static_header_len(height)]
@classmethod @classmethod
def block_txs(cls, block, height): def block_full(cls, block, height):
'''Returns a list of (deserialized_tx, tx_hash) pairs given a '''Returns (header, [(deserialized_tx, tx_hash), ...]) given a
block and its height.''' block and its height.'''
header = cls.block_header(block, height)
deserializer = cls.deserializer() deserializer = cls.deserializer()
return deserializer(block[cls.header_len(height):]).read_block() txs = deserializer(block[len(header):]).read_tx_block()
return Block(header, txs)
@classmethod @classmethod
def decimal_value(cls, value): def decimal_value(cls, value):
@ -635,8 +644,9 @@ class FairCoin(Coin):
P2PKH_VERBYTE = bytes.fromhex("5f") P2PKH_VERBYTE = bytes.fromhex("5f")
P2SH_VERBYTE = bytes.fromhex("24") P2SH_VERBYTE = bytes.fromhex("24")
WIF_BYTE = bytes.fromhex("df") WIF_BYTE = bytes.fromhex("df")
GENESIS_HASH=('1f701f2b8de1339dc0ec908f3fb6e9b0' GENESIS_HASH = ('1f701f2b8de1339dc0ec908f3fb6e9b0'
'b870b6f20ba893e120427e42bbc048d7') 'b870b6f20ba893e120427e42bbc048d7')
BASIC_HEADER_SIZE = 108
TX_COUNT = 1000 TX_COUNT = 1000
TX_COUNT_HEIGHT = 1000 TX_COUNT_HEIGHT = 1000
TX_PER_BLOCK = 1 TX_PER_BLOCK = 1
@ -650,22 +660,14 @@ class FairCoin(Coin):
] ]
@classmethod @classmethod
def header_offset(cls, height): def block_full(cls, block, height):
'''Given a header height return its offset in the headers file. '''Returns (header, [(deserialized_tx, tx_hash), ...]) given a
If header sizes change at some point, this is the only code
that needs updating.'''
return height * 108
@classmethod
def block_txs(cls, block, height):
'''Returns a list of (deserialized_tx, tx_hash) pairs given a
block and its height.''' block and its height.'''
if height == 0: if height > 0:
return [] return cls.block_full(block, height)
else:
deserializer = cls.deserializer() return Block(cls.block_header(block, height), [])
return deserializer(block[cls.header_len(height):]).read_block()
@classmethod @classmethod
def electrum_header(cls, header, height): def electrum_header(cls, header, height):

5
lib/tx.py

@ -1,4 +1,5 @@
# Copyright (c) 2016-2017, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# Copyright (c) 2017, the ElectrumX authors
# #
# All rights reserved. # All rights reserved.
# #
@ -105,10 +106,10 @@ class Deserializer(object):
self._read_le_uint32() # locktime self._read_le_uint32() # locktime
), double_sha256(self.binary[start:self.cursor]) ), double_sha256(self.binary[start:self.cursor])
def read_block(self): def read_tx_block(self):
'''Returns a list of (deserialized_tx, tx_hash) pairs.''' '''Returns a list of (deserialized_tx, tx_hash) pairs.'''
read_tx = self.read_tx read_tx = self.read_tx
txs = [read_tx() for n in range(self._read_varint())] txs = [read_tx() for _ in range(self._read_varint())]
# Some coins have excess data beyond the end of the transactions # Some coins have excess data beyond the end of the transactions
return txs return txs

17
server/block_processor.py

@ -1,4 +1,5 @@
# Copyright (c) 2016-2017, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# Copyright (c) 2017, the ElectrumX authors
# #
# All rights reserved. # All rights reserved.
# #
@ -231,15 +232,15 @@ class BlockProcessor(server.db.DB):
.format(len(blocks), first, self.height + 1)) .format(len(blocks), first, self.height + 1))
return return
headers = [self.coin.block_header(block, first + n) blocks = [self.coin.block_full(block, first + n)
for n, block in enumerate(blocks)] for n, block in enumerate(blocks)]
headers = [b.header for b in blocks]
hprevs = [self.coin.header_prevhash(h) for h in headers] hprevs = [self.coin.header_prevhash(h) for h in headers]
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain: if hprevs == chain:
start = time.time() start = time.time()
await self.controller.run_in_executor(self.advance_blocks, await self.controller.run_in_executor(self.advance_blocks, blocks)
blocks, headers)
if not self.first_sync: if not self.first_sync:
s = '' if len(blocks) == 1 else 's' s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s' self.logger.info('processed {:,d} block{} in {:.1f}s'
@ -479,18 +480,18 @@ class BlockProcessor(server.db.DB):
if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5:
self.flush(utxo_MB >= self.cache_MB * 4 // 5) self.flush(utxo_MB >= self.cache_MB * 4 // 5)
def advance_blocks(self, blocks, headers): def advance_blocks(self, blocks):
'''Synchronously advance the blocks. '''Synchronously advance the blocks.
It is already verified they correctly connect onto our tip. It is already verified they correctly connect onto our tip.
''' '''
block_txs = self.coin.block_txs headers = [block.header for block in blocks]
min_height = self.min_undo_height(self.daemon.cached_height()) min_height = self.min_undo_height(self.daemon.cached_height())
height = self.height height = self.height
for block in blocks: for block in blocks:
height += 1 height += 1
undo_info = self.advance_txs(block_txs(block, height)) undo_info = self.advance_txs(block.transactions)
if height >= min_height: if height >= min_height:
self.undo_infos.append((undo_info, height)) self.undo_infos.append((undo_info, height))
@ -568,14 +569,14 @@ class BlockProcessor(server.db.DB):
coin = self.coin coin = self.coin
for block in blocks: for block in blocks:
# Check and update self.tip # Check and update self.tip
header = coin.block_header(block, self.height) header, txs = coin.block_full(block, self.height)
header_hash = coin.header_hash(header) header_hash = coin.header_hash(header)
if header_hash != self.tip: if header_hash != self.tip:
raise ChainError('backup block {} not tip {} at height {:,d}' raise ChainError('backup block {} not tip {} at height {:,d}'
.format(hash_to_str(header_hash), .format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height)) hash_to_str(self.tip), self.height))
self.tip = coin.header_prevhash(header) self.tip = coin.header_prevhash(header)
self.backup_txs(coin.block_txs(block, self.height)) self.backup_txs(txs)
self.height -= 1 self.height -= 1
self.tx_counts.pop() self.tx_counts.pop()

25
server/db.py

@ -1,4 +1,5 @@
# Copyright (c) 2016, Neil Booth # Copyright (c) 2016, Neil Booth
# Copyright (c) 2017, the ElectrumX authors
# #
# All rights reserved. # All rights reserved.
# #
@ -44,6 +45,13 @@ class DB(util.LoggedClass):
self.env = env self.env = env
self.coin = env.coin self.coin = env.coin
# Setup block header size handlers
if self.coin.STATIC_BLOCK_HEADERS:
self.header_offset = self.coin.static_header_offset
self.header_len = self.coin.static_header_len
else:
raise Exception("Non static headers are not supported")
self.logger.info('switching current directory to {}' self.logger.info('switching current directory to {}'
.format(env.db_dir)) .format(env.db_dir))
os.chdir(env.db_dir) os.chdir(env.db_dir)
@ -191,24 +199,25 @@ class DB(util.LoggedClass):
updated. These arrays are all append only, so in a crash we updated. These arrays are all append only, so in a crash we
just pick up again from the DB height. just pick up again from the DB height.
''' '''
blocks_done = len(self.headers) blocks_done = len(headers)
height_start = fs_height + 1
new_height = fs_height + blocks_done new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count txs_done = cur_tx_count - prior_tx_count
assert len(self.tx_hashes) == blocks_done assert len(block_tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1 assert len(self.tx_counts) == new_height + 1
hashes = b''.join(block_tx_hashes) hashes = b''.join(block_tx_hashes)
assert len(hashes) % 32 == 0 assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done assert len(hashes) // 32 == txs_done
# Write the headers, tx counts, and tx hashes # Write the headers, tx counts, and tx hashes
offset = self.coin.header_offset(fs_height + 1) offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(headers)) self.headers_file.write(offset, b''.join(headers))
offset = (fs_height + 1) * self.tx_counts.itemsize offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset, self.tx_counts_file.write(offset,
self.tx_counts[fs_height + 1:].tobytes()) self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32 offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes) self.hashes_file.write(offset, hashes)
@ -220,8 +229,8 @@ class DB(util.LoggedClass):
raise self.DBError('{:,d} headers starting at {:,d} not on disk' raise self.DBError('{:,d} headers starting at {:,d} not on disk'
.format(count, start)) .format(count, start))
if disk_count: if disk_count:
offset = self.coin.header_offset(start) offset = self.header_offset(start)
size = self.coin.header_offset(start + disk_count) - offset size = self.header_offset(start + disk_count) - offset
return self.headers_file.read(offset, size) return self.headers_file.read(offset, size)
return b'' return b''
@ -241,7 +250,7 @@ class DB(util.LoggedClass):
offset = 0 offset = 0
headers = [] headers = []
for n in range(count): for n in range(count):
hlen = self.coin.header_len(height + n) hlen = self.header_len(height + n)
headers.append(headers_concat[offset:offset + hlen]) headers.append(headers_concat[offset:offset + hlen])
offset += hlen offset += hlen

Loading…
Cancel
Save