Browse Source

Merge branch 'develop'

master
Neil Booth 8 years ago
parent
commit
957dfb6fb8
  1. 10
      lib/util.py
  2. 13
      server/block_processor.py
  3. 287
      server/mempool.py
  4. 267
      server/protocol.py

10
lib/util.py

@ -9,6 +9,7 @@
import array
import asyncio
import inspect
import logging
import sys
@ -132,3 +133,12 @@ def increment_byte_string(bs):
# This can only happen if all characters are 0xff
bs = bytes([1]) + bs
return bytes(bs)
async def asyncio_clean_shutdown(loop=None):
while True:
pending_tasks = [task for task in asyncio.Task.all_tasks(loop)
if not task.done()]
if len(pending_tasks) <= 1:
break
await asyncio.sleep(0)
await asyncio.sleep(1)

13
server/block_processor.py

@ -132,11 +132,9 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, client, env):
def __init__(self, env, touched, touched_event):
super().__init__(env)
self.client = client
# The block processor reads its tasks from this queue
self.tasks = asyncio.Queue()
@ -151,6 +149,8 @@ class BlockProcessor(server.db.DB):
self.caught_up = False
self._shutdown = False
self.event = asyncio.Event()
self.touched = touched
self.touched_event = touched_event
# Meta
self.utxo_MB = env.utxo_MB
@ -218,9 +218,8 @@ class BlockProcessor(server.db.DB):
for block in blocks:
if self._shutdown:
break
self.advance_block(block, touched)
self.advance_block(block, self.touched)
touched = set()
loop = asyncio.get_event_loop()
try:
if self.caught_up:
@ -228,14 +227,14 @@ class BlockProcessor(server.db.DB):
else:
do_it()
except ChainReorg:
await self.handle_chain_reorg(touched)
await self.handle_chain_reorg(self.touched)
if self.caught_up:
# Flush everything as queries are performed on the DB and
# not in-memory.
await asyncio.sleep(0)
self.flush(True)
self.client.notify(touched)
self.touched_event.set()
elif time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60

287
server/mempool.py

@ -0,0 +1,287 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Mempool handling.'''
import asyncio
import itertools
import time
from collections import defaultdict
from functools import partial
from lib.hash import hash_to_str, hex_str_to_hash
from lib.tx import Deserializer
import lib.util as util
from server.daemon import DaemonError
class MemPool(util.LoggedClass):
'''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient
response to the value() and transactions() calls.
To that end we maintain the following maps:
tx_hash -> (txin_pairs, txout_pairs)
hash168 -> set of all tx hashes in which the hash168 appears
A pair is a (hash168, value) tuple. tx hashes are hex strings.
'''
def __init__(self, daemon, coin, db, touched, touched_event):
super().__init__()
self.daemon = daemon
self.coin = coin
self.db = db
self.touched = touched
self.touched_event = touched_event
self.stop = False
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
async def main_loop(self, caught_up):
'''Asynchronously maintain mempool status with daemon.
Waits until the caught up event is signalled.'''
await caught_up.wait()
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
try:
await self.fetch_and_process()
except asyncio.CancelledError:
# This aids clean shutdowns
self.stop = True
async def fetch_and_process(self):
'''The inner loop unprotected by try / except.'''
unfetched = set()
unprocessed = {}
log_every = 150
log_secs = 0
fetch_size = 400
process_some = self.async_process_some(unfetched, fetch_size // 2)
next_refresh = 0
# The list of mempool hashes is fetched no more frequently
# than this number of seconds
refresh_secs = 5
while True:
try:
now = time.time()
if now >= next_refresh:
await self.new_hashes(unprocessed, unfetched)
next_refresh = now + refresh_secs
log_secs -= refresh_secs
# Fetch some txs if unfetched ones remain
if unfetched:
count = min(len(unfetched), fetch_size)
hex_hashes = [unfetched.pop() for n in range(count)]
unprocessed.update(await self.fetch_raw_txs(hex_hashes))
# Process some txs if unprocessed ones remain
if unprocessed:
await process_some(unprocessed)
if self.touched:
self.touched_event.set()
if log_secs <= 0 and not unprocessed:
log_secs = log_every
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs),
len(self.hash168s)))
await asyncio.sleep(1)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
async def new_hashes(self, unprocessed, unfetched):
'''Get the current list of hashes in the daemon's mempool.
Remove ones that have disappeared from self.txs and unprocessed.
'''
txs = self.txs
hash168s = self.hash168s
touched = self.touched
hashes = set(await self.daemon.mempool_hashes())
new = hashes.difference(txs)
gone = set(txs).difference(hashes)
for hex_hash in gone:
unprocessed.pop(hex_hash, None)
item = txs.pop(hex_hash)
if item:
txin_pairs, txout_pairs = item
tx_hash168s = set(hash168 for hash168, value in txin_pairs)
tx_hash168s.update(hash168 for hash168, value in txout_pairs)
for hash168 in tx_hash168s:
hash168s[hash168].remove(hex_hash)
if not hash168s[hash168]:
del hash168s[hash168]
touched.update(tx_hash168s)
unfetched.update(new)
for hex_hash in new:
txs[hex_hash] = None
def async_process_some(self, unfetched, limit):
loop = asyncio.get_event_loop()
pending = []
txs = self.txs
async def process(unprocessed):
nonlocal pending
raw_txs = {}
while unprocessed and len(raw_txs) < limit:
hex_hash, raw_tx = unprocessed.popitem()
raw_txs[hex_hash] = raw_tx
if unprocessed:
deferred = []
else:
deferred = pending
pending = []
process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred)
result, deferred = (
await loop.run_in_executor(None, process_raw_txs))
pending.extend(deferred)
hash168s = self.hash168s
touched = self.touched
for hex_hash, in_out_pairs in result.items():
if hex_hash in txs:
txs[hex_hash] = in_out_pairs
for hash168, value in itertools.chain(*in_out_pairs):
touched.add(hash168)
hash168s[hash168].add(hex_hash)
to_do = len(unfetched) + len(unprocessed)
if to_do:
percent = (len(txs) - to_do) * 100 // len(txs)
self.logger.info('catchup {:d}% complete'.format(percent))
return process
async def fetch_raw_txs(self, hex_hashes):
'''Fetch a list of mempool transactions.'''
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
# Skip hashes the daemon has dropped. Either they were
# evicted or they got in a block.
return {hh:raw for hh, raw in zip(hex_hashes, raw_txs) if raw}
def process_raw_txs(self, raw_tx_map, pending):
'''Process the dictionary of raw transactions and return a dictionary
of updates to apply to self.txs.
This runs in the executor so should not update any member
variables it doesn't own. Atomic reads of self.txs that do
not depend on the result remaining the same are fine.
'''
script_hash168 = self.coin.hash168_from_script()
db_utxo_lookup = self.db.db_utxo_lookup
txs = self.txs
# Deserialize each tx and put it in our priority queue
for tx_hash, raw_tx in raw_tx_map.items():
if not tx_hash in txs:
continue
tx = Deserializer(raw_tx).read_tx()
# Convert the tx outputs into (hash168, value) pairs
txout_pairs = [(script_hash168(txout.pk_script), txout.value)
for txout in tx.outputs]
# Convert the tx inputs to ([prev_hex_hash, prev_idx) pairs
txin_pairs = [(hash_to_str(txin.prev_hash), txin.prev_idx)
for txin in tx.inputs]
pending.append((tx_hash, txin_pairs, txout_pairs))
# Now process what we can
result = {}
deferred = []
for item in pending:
if self.stop:
break
tx_hash, old_txin_pairs, txout_pairs = item
if tx_hash not in txs:
continue
mempool_missing = False
txin_pairs = []
try:
for prev_hex_hash, prev_idx in old_txin_pairs:
tx_info = txs.get(prev_hex_hash, 0)
if tx_info is None:
tx_info = result.get(prev_hex_hash)
if not tx_info:
mempool_missing = True
continue
if tx_info:
txin_pairs.append(tx_info[1][prev_idx])
elif not mempool_missing:
prev_hash = hex_str_to_hash(prev_hex_hash)
txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx))
except self.db.MissingUTXOError:
# This typically happens just after the daemon has
# accepted a new block and the new mempool has deps on
# new txs in that block.
continue
if mempool_missing:
deferred.append(item)
else:
result[tx_hash] = (txin_pairs, txout_pairs)
return result, deferred
async def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
# hash168s is a defaultdict
if not hash168 in self.hash168s:
return []
hex_hashes = self.hash168s[hash168]
raw_txs = self.bp.daemon.getrawtransactions(hex_hashes)
result = []
for hex_hash, raw_tx in zip(hex_hashes, raw_txs):
item = self.txs.get(hex_hash)
if not item or not raw_tx:
continue
tx = Deserializer(raw_tx).read_tx()
txin_pairs, txout_pairs = item
tx_fee = (sum(v for hash168, v in txin_pairs)
- sum(v for hash168, v in txout_pairs))
unconfirmed = any(txin.prev_hash not in self.txs
for txin in tx.inputs)
result.append((hex_hash, tx_fee, unconfirmed))
return result
def value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
value = 0
# hash168s is a defaultdict
if hash168 in self.hash168s:
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
value += sum(v for h168, v in txout_pairs if h168 == hash168)
return value

267
server/protocol.py

@ -22,192 +22,14 @@ import pylru
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC, RequestBase
from lib.tx import Deserializer
import lib.util as util
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from server.irc import IRC
from server.mempool import MemPool
from server.version import VERSION
class MemPool(util.LoggedClass):
'''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient
response to the value() and transactions() calls.
To that end we maintain the following maps:
tx_hash -> [txin_pairs, txout_pairs, unconfirmed]
hash168 -> set of all tx hashes in which the hash168 appears
A pair is a (hash168, value) tuple. Unconfirmed is true if any of the
tx's txins are unconfirmed. tx hashes are hex strings.
'''
def __init__(self, daemon, coin, db, manager):
super().__init__()
self.daemon = daemon
self.coin = coin
self.db = db
self.manager = manager
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
self.count = -1
async def main_loop(self, caught_up):
'''Asynchronously maintain mempool status with daemon.
Waits until the caught up event is signalled.'''
await caught_up.wait()
self.logger.info('maintaining state with daemon...')
while True:
try:
await self.update()
await asyncio.sleep(5)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
async def update(self):
'''Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool.
'''
hex_hashes = set(await self.daemon.mempool_hashes())
touched = set()
missing_utxos = []
initial = self.count < 0
if initial:
self.logger.info('beginning import of {:,d} mempool txs'
.format(len(hex_hashes)))
# Remove gone items
gone = set(self.txs).difference(hex_hashes)
for hex_hash in gone:
txin_pairs, txout_pairs, unconfirmed = self.txs.pop(hex_hash)
hash168s = set(hash168 for hash168, value in txin_pairs)
hash168s.update(hash168 for hash168, value in txout_pairs)
for hash168 in hash168s:
self.hash168s[hash168].remove(hex_hash)
if not self.hash168s[hash168]:
del self.hash168s[hash168]
touched.update(hash168s)
# Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs)
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('analysing {:,d} mempool txs'
.format(len(raw_txs)))
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes
# The mempool is unordered, so process all outputs first so
# that looking for inputs has full info.
script_hash168 = self.coin.hash168_from_script()
db_utxo_lookup = self.db.db_utxo_lookup
def txout_pair(txout):
return (script_hash168(txout.pk_script), txout.value)
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 20 == 0:
await asyncio.sleep(0)
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
self.txs[hex_hash] = (None, txout_pairs, None)
def txin_info(txin):
hex_hash = hash_to_str(txin.prev_hash)
mempool_entry = self.txs.get(hex_hash)
if mempool_entry:
return mempool_entry[1][txin.prev_idx], True
pair = db_utxo_lookup(txin.prev_hash, txin.prev_idx)
return pair, False
if initial:
next_log = time.time()
self.logger.info('processed outputs, now examining inputs. '
'This can take some time...')
# Now add the inputs
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
await asyncio.sleep(0)
if initial and time.time() > next_log:
next_log = time.time() + 20
self.logger.info('{:,d} done ({:d}%)'
.format(n, int(n / len(new_txs) * 100)))
txout_pairs = self.txs[hex_hash][1]
try:
infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos)
except self.db.MissingUTXOError:
# Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no
# longer be missing.
del self.txs[hex_hash]
continue
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
# Update touched and self.hash168s for the new tx
for hash168, value in txin_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
for hash168, value in txout_pairs:
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
if missing_utxos:
self.logger.info('{:,d} txs had missing UTXOs; probably the '
'daemon is a block or two ahead of us.'
.format(len(missing_utxos)))
first = ', '.join('{} / {:,d}'.format(hash_to_str(txin.prev_hash),
txin.prev_idx)
for txin in sorted(missing_utxos)[:3])
self.logger.info('first ones are {}'.format(first))
self.count += 1
if self.count % 25 == 0 or gone:
self.count = 0
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
self.manager.notify(touched)
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
tx_fee = (sum(v for hash168, v in txin_pairs)
- sum(v for hash168, v in txout_pairs))
yield (hex_hash, tx_fee, unconfirmed)
def value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
value = 0
for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
value -= sum(v for h168, v in txin_pairs if h168 == hash168)
value += sum(v for h168, v in txout_pairs if h168 == hash168)
return value
class ServerManager(util.LoggedClass):
'''Manages the client servers, a mempool, and a block processor.
@ -229,9 +51,13 @@ class ServerManager(util.LoggedClass):
def __init__(self, env):
super().__init__()
self.loop = asyncio.get_event_loop()
self.touched = set()
self.touched_event = asyncio.Event()
self.start = time.time()
self.bp = BlockProcessor(self, env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self)
self.bp = BlockProcessor(env, self.touched, self.touched_event)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp,
self.touched, self.touched_event)
self.irc = IRC(env)
self.env = env
self.servers = []
@ -261,13 +87,13 @@ class ServerManager(util.LoggedClass):
self.logger.info('max subscriptions per session: {:,d}'
.format(env.max_session_subs))
def mempool_transactions(self, hash168):
async def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
return await self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
@ -343,10 +169,11 @@ class ServerManager(util.LoggedClass):
# shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(self.mempool.main_loop(self.bp.event))
add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event))
add_future(self.mempool.main_loop(self.bp.event))
add_future(self.enqueue_delayed_sessions())
add_future(self.notify())
for n in range(4):
add_future(self.serve_requests())
@ -356,12 +183,12 @@ class ServerManager(util.LoggedClass):
except asyncio.CancelledError:
break
await self.shutdown()
await util.asyncio_clean_shutdown()
async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop()
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
protocol = partial(protocol_class, self, self.bp, self.env, kind)
server = loop.create_server(protocol, *args, **kw_args)
server = self.loop.create_server(protocol, *args, **kw_args)
host, port = args[:2]
try:
@ -395,27 +222,34 @@ class ServerManager(util.LoggedClass):
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
def notify(self, touched):
async def notify(self):
'''Notify sessions about height changes and touched addresses.'''
# Invalidate caches
hc = self.history_cache
for hash168 in set(hc).intersection(touched):
del hc[hash168]
if self.bp.db_height != self.height:
self.height = self.bp.db_height
self.header_cache.clear()
while True:
await self.touched_event.wait()
touched = self.touched.copy()
self.touched.clear()
self.touched_event.clear()
# Invalidate caches
hc = self.history_cache
for hash168 in set(hc).intersection(touched):
del hc[hash168]
if self.bp.db_height != self.height:
self.height = self.bp.db_height
self.header_cache.clear()
for session in self.sessions:
if isinstance(session, ElectrumX):
request = self.NotificationRequest(self.bp.db_height, touched)
session.enqueue_request(request)
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
data = self.session_data(for_log=True)
for line in ServerManager.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions
for session in self.sessions:
if isinstance(session, ElectrumX):
request = self.NotificationRequest(self.bp.db_height,
touched)
session.enqueue_request(request)
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
data = self.session_data(for_log=True)
for line in ServerManager.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions
def electrum_header(self, height):
'''Return the binary header at the given height.'''
@ -457,8 +291,6 @@ class ServerManager(util.LoggedClass):
server.close()
await server.wait_closed()
self.servers = [] # So add_session closes new sessions
while not all(future.done() for future in self.futures):
await asyncio.sleep(0)
if self.sessions:
await self.close_sessions()
@ -474,7 +306,6 @@ class ServerManager(util.LoggedClass):
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
await asyncio.sleep(1)
def add_session(self, session):
# Some connections are acknowledged after the servers are closed
@ -491,9 +322,12 @@ class ServerManager(util.LoggedClass):
.format(session.peername(), len(self.sessions)))
def remove_session(self, session):
group = self.sessions.pop(session)
group.remove(session)
self.subscription_count -= session.sub_count()
# This test should always be True. However if a bug messes
# things up it prevents consequent log noise
if session in self.sessions:
group = self.sessions.pop(session)
group.remove(session)
self.subscription_count -= session.sub_count()
def close_session(self, session):
'''Close the session's transport and cancel its future.'''
@ -762,6 +596,7 @@ class Session(JSONRPC):
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
errs.append(request)
await asyncio.sleep(0)
if total >= 8:
break
@ -905,7 +740,7 @@ class ElectrumX(Session):
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.manager.async_get_history(hash168)
mempool = self.manager.mempool_transactions(hash168)
mempool = await self.manager.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
@ -940,10 +775,10 @@ class ElectrumX(Session):
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def unconfirmed_history(self, hash168):
async def unconfirmed_history(self, hash168):
# Note unconfirmed history is unordered in electrum-server
# Height is -1 if unconfirmed txins, otherwise 0
mempool = self.manager.mempool_transactions(hash168)
mempool = await self.manager.mempool_transactions(hash168)
return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee}
for tx_hash, fee, unconfirmed in mempool]
@ -953,7 +788,7 @@ class ElectrumX(Session):
conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history]
return conf + self.unconfirmed_history(hash168)
return conf + await self.unconfirmed_history(hash168)
def get_chunk(self, index):
'''Return header chunk as hex. Index is a non-negative integer.'''
@ -995,7 +830,7 @@ class ElectrumX(Session):
async def address_get_mempool(self, params):
hash168 = self.params_to_hash168(params)
return self.unconfirmed_history(hash168)
return await self.unconfirmed_history(hash168)
async def address_get_proof(self, params):
hash168 = self.params_to_hash168(params)

Loading…
Cancel
Save