Browse Source

Merge the DB and BlockProcessor classes

master
Neil Booth 9 years ago
parent
commit
2001d5c4f4
  1. 90
      server/block_processor.py
  2. 25
      server/controller.py
  3. 13
      server/protocol.py

90
server/block_processor.py

@ -25,6 +25,10 @@ def formatted_time(t):
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
class ChainError(Exception):
pass
class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).'''
@ -92,46 +96,10 @@ class BlockProcessor(LoggedClass):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, db, daemon):
def __init__(self, env, daemon):
super().__init__()
self.db = db
self.daemon = daemon
self.prefetcher = Prefetcher(daemon, db.height)
def coros(self):
return [self.start(), self.prefetcher.start()]
def flush_db(self):
self.db.flush(self.daemon.cached_height(), True)
async def start(self):
'''Loop forever processing blocks in the appropriate direction.'''
try:
while True:
blocks = await self.prefetcher.get_blocks()
for block in blocks:
self.db.process_block(block, self.daemon.cached_height())
# Release asynchronous block fetching
await asyncio.sleep(0)
if self.db.height == self.daemon.cached_height():
self.logger.info('caught up to height {:d}'
.format(self.db_height))
self.flush_db()
finally:
self.flush_db()
class DB(LoggedClass):
class Error(Exception):
pass
class ChainError(Exception):
pass
def __init__(self, env):
super().__init__()
self.daemon = daemon
# Meta
self.utxo_MB = env.utxo_MB
@ -159,6 +127,7 @@ class DB(LoggedClass):
self.history_size = 0
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height)
# Redirected member func
self.get_tx_hash = self.fs_cache.get_tx_hash
@ -176,6 +145,26 @@ class DB(LoggedClass):
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
def coros(self):
return [self.start(), self.prefetcher.start()]
async def start(self):
'''Loop forever processing blocks in the appropriate direction.'''
try:
while True:
blocks = await self.prefetcher.get_blocks()
for block in blocks:
self.process_block(block)
# Release asynchronous block fetching
await asyncio.sleep(0)
if self.height == self.daemon.cached_height():
self.logger.info('caught up to height {:d}'
.format(self_height))
self.flush(True)
finally:
if self.daemon.cached_height() is not None:
self.flush(True)
def open_db(self, coin):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
@ -198,7 +187,7 @@ class DB(LoggedClass):
state = db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise self.Error('DB genesis hash {} does not match coin {}'
raise ChainError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
@ -215,7 +204,7 @@ class DB(LoggedClass):
if diff == 0:
return
if diff < 0:
raise self.Error('DB corrupt: flush_count < utxo_flush_count')
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
self.logger.info('DB not shut down cleanly. Scanning for most '
'recent {:,d} history flushes'.format(diff))
@ -260,7 +249,7 @@ class DB(LoggedClass):
self.db_tx_count = self.tx_count
self.db_height = self.height
def flush(self, daemon_height, flush_utxos=False):
def flush(self, flush_utxos=False):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
@ -291,6 +280,7 @@ class DB(LoggedClass):
.format(self.flush_count, self.height, flush_time))
# Log handy stats
daemon_height = self.daemon.cached_height()
txs_per_sec = int(self.tx_count / self.wall_time)
this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
if self.height > self.coin.TX_COUNT_HEIGHT:
@ -325,7 +315,7 @@ class DB(LoggedClass):
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def cache_sizes(self, daemon_height):
def cache_sizes(self):
'''Returns the approximate size of the cache, in MB.'''
# Good average estimates based on traversal of subobjects and
# requesting size from Python (see deep_getsizeof). For
@ -339,7 +329,7 @@ class DB(LoggedClass):
hist_MB = hist_cache_size // one_MB
self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
.format(self.height, daemon_height))
.format(self.height, self.daemon.cached_height()))
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
'hist addrs: {:,d} hist size: {:,d}'
.format(len(self.utxo_cache.cache),
@ -350,16 +340,16 @@ class DB(LoggedClass):
.format(utxo_MB + hist_MB, utxo_MB, hist_MB))
return utxo_MB, hist_MB
def process_block(self, block, daemon_height):
def process_block(self, block):
# We must update the fs_cache before calling process_tx() as
# it uses the fs_cache for tx hash lookup
header, tx_hashes, txs = self.fs_cache.process_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
if prev_hash != self.tip:
raise self.ChainError('trying to build header with prev_hash {} '
'on top of tip with hash {}'
.format(hash_to_str(prev_hash),
hash_to_str(self.tip)))
raise ChainError('trying to build header with prev_hash {} '
'on top of tip with hash {}'
.format(hash_to_str(prev_hash),
hash_to_str(self.tip)))
self.tip = header_hash
self.height += 1
@ -370,9 +360,9 @@ class DB(LoggedClass):
now = time.time()
if now > self.next_cache_check:
self.next_cache_check = now + 60
utxo_MB, hist_MB = self.cache_sizes(daemon_height)
utxo_MB, hist_MB = self.cache_sizes()
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(daemon_height, utxo_MB >= self.utxo_MB)
self.flush(utxo_MB >= self.utxo_MB)
def process_tx(self, tx_hash, tx):
cache = self.utxo_cache

25
server/controller.py

@ -7,7 +7,7 @@ import traceback
from functools import partial
from server.daemon import Daemon, DaemonError
from server.block_processor import BlockProcessor, DB
from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC
from lib.hash import (sha256, double_sha256, hash_to_str,
Base58, hex_str_to_hash)
@ -24,9 +24,8 @@ class Controller(LoggedClass):
super().__init__()
self.loop = loop
self.env = env
self.db = DB(env)
self.daemon = Daemon(env.daemon_url)
self.block_processor = BlockProcessor(self.db, self.daemon)
self.block_processor = BlockProcessor(env, self.daemon)
self.servers = []
self.sessions = set()
self.addresses = {}
@ -112,10 +111,9 @@ class Controller(LoggedClass):
'''Returns status as 32 bytes.'''
status = self.addresses.get(hash168)
if status is None:
status = ''.join(
'{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in self.db.get_history(hash168)
)
history = self.block_processor.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
status = sha256(status.encode())
self.addresses[hash168] = status
@ -148,3 +146,16 @@ class Controller(LoggedClass):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return self.peers
def height(self):
return self.block_processor.height
def get_current_header(self):
return self.block_processor.get_current_header()
def get_history(self, hash168):
history = self.block_processor.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]

13
server/protocol.py

@ -102,9 +102,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
def __init__(self, controller, db, daemon, env):
def __init__(self, controller, daemon, env):
super().__init__(controller)
self.db = db
self.daemon = daemon
self.env = env
self.addresses = set()
@ -123,11 +122,7 @@ class ElectrumX(JSONRPC):
async def handle_blockchain_address_get_history(self, params):
hash168 = self.params_to_hash168(params)
history = [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in self.db.get_history(hash168, limit=None)
]
return history
return self.controller.get_history(hash168)
async def handle_blockchain_address_subscribe(self, params):
hash168 = self.params_to_hash168(params)
@ -140,7 +135,7 @@ class ElectrumX(JSONRPC):
async def handle_blockchain_headers_subscribe(self, params):
self.subscribe_headers = True
return self.db.get_current_header()
return self.controller.get_current_header()
async def handle_blockchain_relayfee(self, params):
'''The minimum fee a low-priority tx must pay in order to be accepted
@ -201,7 +196,7 @@ class LocalRPC(JSONRPC):
async def handle_getinfo(self, params):
return {
'blocks': self.controller.db.height,
'blocks': self.controller.height(),
'peers': len(self.controller.get_peers()),
'sessions': len(self.controller.sessions),
'watched': sum(len(s.addresses) for s in self.controller.sessions

Loading…
Cancel
Save