Browse Source

Merge branch 'release-0.6.2'

master 0.6.2
Neil Booth 8 years ago
parent
commit
de2ae2a3ad
  1. 9
      docs/RELEASE-NOTES
  2. 194
      server/block_processor.py
  3. 24
      server/db.py
  4. 229
      server/protocol.py
  5. 2
      server/version.py

9
docs/RELEASE-NOTES

@ -1,3 +1,12 @@
version 0.6.2
-------------
- handle daemon errors properly that result from client requests; pass the
error onto the client
- start serving immediatley on catchup; don't wait for the mempool
- logging improvements, in particular logging software and DB versions
- issues closed: #29, #31, #32
version 0.6.1
-------------

194
server/block_processor.py

@ -19,8 +19,8 @@ from collections import defaultdict
from functools import partial
from server.daemon import Daemon, DaemonError
from server.version import VERSION
from lib.hash import hash_to_str
from lib.tx import Deserializer
from lib.util import chunks, LoggedClass
import server.db
from server.storage import open_db
@ -151,170 +151,6 @@ class ChainReorg(Exception):
'''Raised on a blockchain reorganisation.'''
class MemPool(LoggedClass):
'''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient
response to the value() and transactions() calls.
To that end we maintain the following maps:
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
hash168 -> set of all tx hashes in which the hash168 appears
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
tx's txins are unconfirmed. tx hashes are hex strings.
'''
def __init__(self, bp):
super().__init__()
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
self.bp = bp
self.count = -1
async def update(self, hex_hashes):
'''Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool.
'''
hex_hashes = set(hex_hashes)
touched = set()
missing_utxos = []
initial = self.count < 0
if initial:
self.logger.info('beginning import of {:,d} mempool txs'
.format(len(hex_hashes)))
# Remove gone items
gone = set(self.txs).difference(hex_hashes)
for hex_hash in gone:
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
hash168s = set(hash168 for hash168, value in txin_pairs)
hash168s.update(hash168 for hash168, value in txout_pairs)
for hash168 in hash168s:
self.hash168s[hash168].remove(hex_hash)
if not self.hash168s[hash168]:
del self.hash168s[hash168]
touched.update(hash168s)
# Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('analysing {:,d} mempool txs'
.format(len(raw_txs)))
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes
# The mempool is unordered, so process all outputs first so
# that looking for inputs has full info.
script_hash168 = self.bp.coin.hash168_from_script()
db_utxo_lookup = self.bp.db_utxo_lookup
def txout_pair(txout):
return (script_hash168(txout.pk_script), txout.value)
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 100 == 0:
await asyncio.sleep(0)
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
self.txs[hex_hash] = (None, txout_pairs, None)
def txin_info(txin):
hex_hash = hash_to_str(txin.prev_hash)
mempool_entry = self.txs.get(hex_hash)
if mempool_entry:
return mempool_entry[1][txin.prev_idx], True
pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx)
return pair, False
if initial:
next_log = time.time()
self.logger.info('processed outputs, now examining inputs. '
'This can take some time...')
# Now add the inputs
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 10 == 0:
await asyncio.sleep(0)
if initial and time.time() > next_log:
next_log = time.time() + 20
self.logger.info('{:,d} done ({:d}%)'
.format(n, int(n / len(new_txs) * 100)))
txout_pairs = self.txs[hex_hash][1]
try:
infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos)
except self.bp.MissingUTXOError:
# Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no
# longer be missing.
del self.txs[hex_hash]
continue
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
# Update touched and self.hash168s for the new tx
for hash168, value in txin_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
for hash168, value in txout_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
if missing_utxos:
self.logger.info('{:,d} txs had missing UTXOs; probably the '
'daemon is a block or two ahead of us.'
.format(len(missing_utxos)))
first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash),
txin.prev_idx)
for txin in sorted(missing_utxos)[:3])
self.logger.info('first ones are {}'.format(first))
self.count += 1
if self.count % 25 == 0 or gone:
self.count = 0
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
# Might include a None
return touched
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
tx_fee = (sum(v for hash168, v in txin_pairs)
- sum(v for hash168, v in txout_pairs))
yield (hex_hash, tx_fee, unconfirmed)
def value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
value = 0
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
value += sum(v for h168, v in txout_pairs if h168 == hash168)
return value
class BlockProcessor(server.db.DB):
'''Process blocks and update the DB state to match.
@ -334,7 +170,6 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(env.daemon_url, env.debug)
self.daemon.debug_set_height(self.height)
self.mempool = MemPool(self)
self.touched = set()
self.futures = []
@ -361,15 +196,9 @@ class BlockProcessor(server.db.DB):
self.db_deletes = []
# Log state
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
self.logger.info('reorg limit is {:,d} blocks'
.format(self.reorg_limit))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB'
@ -428,11 +257,11 @@ class BlockProcessor(server.db.DB):
'''Called after each deamon poll if caught up.'''
# Caught up to daemon height. Flush everything as queries
# are performed on the DB and not in-memory.
self.flush(True)
if self.first_sync:
self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height))
self.touched.update(await self.mempool.update(mempool_hashes))
self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version))
self.flush(True)
async def handle_chain_reorg(self):
# First get all state on disk
@ -1029,18 +858,3 @@ class BlockProcessor(server.db.DB):
tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]]
return tx_hash, tx_height
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)

24
server/db.py

@ -18,6 +18,7 @@ from collections import namedtuple
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256, hash_to_str
from server.storage import open_db
from server.version import VERSION
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@ -29,7 +30,7 @@ class DB(LoggedClass):
it was shutdown uncleanly.
'''
VERSIONS = [3]
DB_VERSIONS = [3]
class MissingUTXOError(Exception):
'''Raised if a mempool tx input UTXO couldn't be found.'''
@ -77,6 +78,7 @@ class DB(LoggedClass):
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.db_version = max(self.DB_VERSIONS)
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
@ -87,11 +89,11 @@ class DB(LoggedClass):
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from DB')
db_version = state.get('db_version', 0)
if db_version not in self.VERSIONS:
self.db_version = state['db_version']
if self.db_version not in self.DB_VERSIONS:
raise self.DBError('your DB version is {} but this software '
'only handles versions {}'
.format(db_version, self.VERSIONS))
.format(db_version, self.DB_VERSIONS))
if state['genesis'] != self.coin.GENESIS_HASH:
raise self.DBError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
@ -104,6 +106,18 @@ class DB(LoggedClass):
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
self.logger.info('software version: {}'.format(VERSION))
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
def write_state(self, batch):
'''Write chain state to the batch.'''
state = {
@ -115,7 +129,7 @@ class DB(LoggedClass):
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': max(self.VERSIONS),
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())

229
server/protocol.py

@ -14,11 +14,12 @@ import json
import ssl
import time
import traceback
from collections import namedtuple
from collections import defaultdict, namedtuple
from functools import partial
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC, json_notification_payload
from lib.tx import Deserializer
from lib.util import LoggedClass
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
@ -27,19 +28,25 @@ from server.version import VERSION
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also has a server manager and starts
servers when caught up.'''
'''Like BlockProcessor but also has a mempool and a server manager.
Servers are started immediately the block processor first catches
up with the daemon.
'''
def __init__(self, env):
super().__init__(env)
self.server_mgr = ServerManager(self, env)
self.bs_caught_up = False
self.mempool = MemPool(self)
self.caught_up_yet = False
async def caught_up(self, mempool_hashes):
# Call the base class to flush before doing anything else.
await super().caught_up(mempool_hashes)
if not self.bs_caught_up:
if not self.caught_up_yet:
await self.server_mgr.start_servers()
self.bs_caught_up = True
self.caught_up_yet = True
self.touched.update(await self.mempool.update(mempool_hashes))
self.server_mgr.notify(self.height, self.touched)
def on_cancel(self):
@ -47,6 +54,185 @@ class BlockServer(BlockProcessor):
self.server_mgr.stop()
super().on_cancel()
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)
class MemPool(LoggedClass):
'''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient
response to the value() and transactions() calls.
To that end we maintain the following maps:
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
hash168 -> set of all tx hashes in which the hash168 appears
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
tx's txins are unconfirmed. tx hashes are hex strings.
'''
def __init__(self, bp):
super().__init__()
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
self.bp = bp
self.count = -1
async def update(self, hex_hashes):
'''Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool.
'''
hex_hashes = set(hex_hashes)
touched = set()
missing_utxos = []
initial = self.count < 0
if initial:
self.logger.info('beginning import of {:,d} mempool txs'
.format(len(hex_hashes)))
# Remove gone items
gone = set(self.txs).difference(hex_hashes)
for hex_hash in gone:
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
hash168s = set(hash168 for hash168, value in txin_pairs)
hash168s.update(hash168 for hash168, value in txout_pairs)
for hash168 in hash168s:
self.hash168s[hash168].remove(hex_hash)
if not self.hash168s[hash168]:
del self.hash168s[hash168]
touched.update(hash168s)
# Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('analysing {:,d} mempool txs'
.format(len(raw_txs)))
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes
# The mempool is unordered, so process all outputs first so
# that looking for inputs has full info.
script_hash168 = self.bp.coin.hash168_from_script()
db_utxo_lookup = self.bp.db_utxo_lookup
def txout_pair(txout):
return (script_hash168(txout.pk_script), txout.value)
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 100 == 0:
await asyncio.sleep(0)
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
self.txs[hex_hash] = (None, txout_pairs, None)
def txin_info(txin):
hex_hash = hash_to_str(txin.prev_hash)
mempool_entry = self.txs.get(hex_hash)
if mempool_entry:
return mempool_entry[1][txin.prev_idx], True
pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx)
return pair, False
if initial:
next_log = time.time()
self.logger.info('processed outputs, now examining inputs. '
'This can take some time...')
# Now add the inputs
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 10 == 0:
await asyncio.sleep(0)
if initial and time.time() > next_log:
next_log = time.time() + 20
self.logger.info('{:,d} done ({:d}%)'
.format(n, int(n / len(new_txs) * 100)))
txout_pairs = self.txs[hex_hash][1]
try:
infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos)
except self.bp.MissingUTXOError:
# Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no
# longer be missing.
del self.txs[hex_hash]
continue
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
# Update touched and self.hash168s for the new tx
for hash168, value in txin_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
for hash168, value in txout_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
if missing_utxos:
self.logger.info('{:,d} txs had missing UTXOs; probably the '
'daemon is a block or two ahead of us.'
.format(len(missing_utxos)))
first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash),
txin.prev_idx)
for txin in sorted(missing_utxos)[:3])
self.logger.info('first ones are {}'.format(first))
self.count += 1
if self.count % 25 == 0 or gone:
self.count = 0
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
# Might include a None
return touched
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
tx_fee = (sum(v for hash168, v in txin_pairs)
- sum(v for hash168, v in txout_pairs))
yield (hex_hash, tx_fee, unconfirmed)
def value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
value = 0
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
value += sum(v for h168, v in txout_pairs if h168 == hash168)
return value
class ServerManager(LoggedClass):
'''Manages the servers.'''
@ -163,8 +349,8 @@ class ServerManager(LoggedClass):
now = time.time()
return [(session.kind,
session.peername(for_log=False),
len(session.hash168s),
'RPC' if isinstance(session, LocalRPC) else session.client,
session.sub_count(),
session.client,
session.recv_count, session.recv_size,
session.send_count, session.send_size,
session.error_count,
@ -197,9 +383,7 @@ class Session(JSONRPC):
self.daemon = bp.daemon
self.coin = bp.coin
self.kind = kind
self.hash168s = set()
self.jobs = asyncio.Queue()
self.current_task = None
self.client = 'unknown'
def connection_made(self, transport):
@ -251,6 +435,16 @@ class Session(JSONRPC):
return 'xx.xx.xx.xx:xx'
return '{}:{}'.format(self.peer_info[0], self.peer_info[1])
def sub_count(self):
return 0
async def daemon_request(self, method, *args):
'''Catch a DaemonError and convert it to an RPCError.'''
try:
return await getattr(self.daemon, method)(*args)
except DaemonError as e:
raise RPCError('daemon error: {}'.format(e))
def tx_hash_from_param(self, param):
'''Raise an RPCError if the parameter is not a valid transaction
hash.'''
@ -309,6 +503,7 @@ class ElectrumX(Session):
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
self.hash168s = set()
rpcs = [
('blockchain',
'address.get_balance address.get_history address.get_mempool '
@ -324,6 +519,9 @@ class ElectrumX(Session):
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
def sub_count(self):
return len(self.hash168s)
async def notify(self, height, touched, cache):
'''Notify the client about changes in height and touched addresses.
@ -392,8 +590,8 @@ class ElectrumX(Session):
async def tx_merkle(self, tx_hash, height):
'''tx_hash is a hex string.'''
hex_hashes = await self.daemon.block_hex_hashes(height, 1)
block = await self.daemon.deserialised_block(hex_hashes[0])
hex_hashes = await self.daemon_request('block_hex_hashes', height, 1)
block = await self.daemon_request('deserialised_block', hex_hashes[0])
tx_hashes = block['tx']
try:
pos = tx_hashes.index(tx_hash)
@ -503,7 +701,7 @@ class ElectrumX(Session):
return self.electrum_header(height)
async def estimatefee(self, params):
return await self.daemon.estimatefee(params)
return await self.daemon_request('estimatefee', params)
async def headers_subscribe(self, params):
self.require_empty_params(params)
@ -519,7 +717,7 @@ class ElectrumX(Session):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
self.require_empty_params(params)
return await self.daemon.relayfee()
return await self.daemon_request('relayfee')
async def transaction_broadcast(self, params):
'''Pass through the parameters to the daemon.
@ -555,7 +753,7 @@ class ElectrumX(Session):
# in anticipation it might be dropped in the future.
if 1 <= len(params) <= 2:
tx_hash = self.tx_hash_from_param(params[0])
return await self.daemon.getrawtransaction(tx_hash)
return await self.daemon_request('getrawtransaction', tx_hash)
raise self.RPCError('params wrong length: {}'.format(params))
@ -629,3 +827,4 @@ class LocalRPC(Session):
cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds}
self.client = 'RPC'

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.6.1"
VERSION = "ElectrumX 0.6.2"

Loading…
Cancel
Save