Browse Source

Merge ServerManager and BlockServer

master
Neil Booth 9 years ago
parent
commit
bab0d162de
  1. 4
      electrumx_server.py
  2. 33
      server/block_processor.py
  3. 138
      server/protocol.py

4
electrumx_server.py

@ -17,7 +17,7 @@ import traceback
from functools import partial from functools import partial
from server.env import Env from server.env import Env
from server.protocol import BlockServer from server.protocol import ServerManager
SUPPRESS_MESSAGES = [ SUPPRESS_MESSAGES = [
'Fatal read error on socket transport', 'Fatal read error on socket transport',
@ -45,7 +45,7 @@ def main_loop():
'accept_connection2()' in repr(context.get('task'))): 'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context) loop.default_exception_handler(context)
server = BlockServer(Env()) server = ServerManager(Env())
future = asyncio.ensure_future(server.main_loop()) future = asyncio.ensure_future(server.main_loop())
# Install signal handlers # Install signal handlers

33
server/block_processor.py

@ -133,9 +133,11 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env): def __init__(self, client, env):
super().__init__(env) super().__init__(env)
self.client = client
# These are our state as we move ahead of DB state # These are our state as we move ahead of DB state
self.fs_height = self.db_height self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count self.fs_tx_count = self.db_tx_count
@ -146,7 +148,6 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False self.caught_up = False
self.touched = set() self.touched = set()
self.futures = []
# Meta # Meta
self.utxo_MB = env.utxo_MB self.utxo_MB = env.utxo_MB
@ -185,7 +186,7 @@ class BlockProcessor(server.db.DB):
Safely flushes the DB on clean shutdown. Safely flushes the DB on clean shutdown.
''' '''
self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) prefetcher_loop = asyncio.ensure_future(self.prefetcher.main_loop())
# Simulate a reorg if requested # Simulate a reorg if requested
if self.env.force_reorg > 0: if self.env.force_reorg > 0:
@ -197,20 +198,11 @@ class BlockProcessor(server.db.DB):
while True: while True:
await self._wait_for_update() await self._wait_for_update()
except asyncio.CancelledError: except asyncio.CancelledError:
self.on_cancel() pass
await self.wait_shutdown()
def on_cancel(self):
'''Called when the main loop is cancelled.
Intended to be overridden in derived classes.''' prefetcher_loop.cancel()
for future in self.futures:
future.cancel()
self.flush(True) self.flush(True)
await self.client.shutdown()
async def wait_shutdown(self):
'''Wait for shutdown to complete cleanly, and return.'''
await asyncio.sleep(0)
async def _wait_for_update(self): async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks. '''Wait for the prefetcher to deliver blocks.
@ -237,26 +229,21 @@ class BlockProcessor(server.db.DB):
# Flush everything as queries are performed on the DB and # Flush everything as queries are performed on the DB and
# not in-memory. # not in-memory.
self.flush(True) self.flush(True)
self.notify(self.touched) self.client.notify(self.touched)
elif time.time() > self.next_cache_check: elif time.time() > self.next_cache_check:
self.check_cache_size() self.check_cache_size()
self.next_cache_check = time.time() + 60 self.next_cache_check = time.time() + 60
self.touched = set() self.touched = set()
async def first_caught_up(self): async def first_caught_up(self):
'''Called after each deamon poll if caught up.''' '''Called when first caught up after start, or after a reorg.'''
self.caught_up = True self.caught_up = True
if self.first_sync: if self.first_sync:
self.first_sync = False self.first_sync = False
self.logger.info('{} synced to height {:,d}. DB version:' self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version)) .format(VERSION, self.height, self.db_version))
self.flush(True) self.flush(True)
await self.client.first_caught_up()
def notify(self, touched):
'''Called with list of touched addresses by new blocks.
Only called for blocks found after first_caught_up is called.
Intended to be overridden in derived classes.'''
async def handle_chain_reorg(self, count): async def handle_chain_reorg(self, count):
'''Handle a chain reorganisation. '''Handle a chain reorganisation.

138
server/protocol.py

@ -20,63 +20,14 @@ from functools import partial
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC, json_notification_payload from lib.jsonrpc import JSONRPC, json_notification_payload
from lib.tx import Deserializer from lib.tx import Deserializer
from lib.util import LoggedClass import lib.util as util
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.daemon import DaemonError from server.daemon import DaemonError
from server.irc import IRC from server.irc import IRC
from server.version import VERSION from server.version import VERSION
class BlockServer(BlockProcessor): class MemPool(util.LoggedClass):
'''Like BlockProcessor but also has a mempool and a server manager.
Servers are started immediately the block processor first catches
up with the daemon.
'''
def __init__(self, env):
super().__init__(env)
self.server_mgr = ServerManager(self, env)
self.mempool = MemPool(self)
async def first_caught_up(self):
# Call the base class to flush and log first
await super().first_caught_up()
await self.server_mgr.start_servers()
self.futures.append(self.mempool.start())
def notify(self, touched):
'''Called when addresses are touched by new blocks or mempool
updates.'''
self.server_mgr.notify(self.height, touched)
def on_cancel(self):
'''Called when the main loop is cancelled.'''
self.server_mgr.stop()
super().on_cancel()
async def wait_shutdown(self):
'''Wait for shutdown to complete cleanly, and return.'''
await self.server_mgr.wait_shutdown()
await super().wait_shutdown()
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)
class MemPool(LoggedClass):
'''Representation of the daemon's mempool. '''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient Updated regularly in caught-up state. Goal is to enable efficient
@ -91,11 +42,14 @@ class MemPool(LoggedClass):
tx's txins are unconfirmed. tx hashes are hex strings. tx's txins are unconfirmed. tx hashes are hex strings.
''' '''
def __init__(self, bp): def __init__(self, daemon, coin, db, manager):
super().__init__() super().__init__()
self.daemon = daemon
self.coin = coin
self.db = db
self.manager = manager
self.txs = {} self.txs = {}
self.hash168s = defaultdict(set) # None can be a key self.hash168s = defaultdict(set) # None can be a key
self.bp = bp
self.count = -1 self.count = -1
def start(self): def start(self):
@ -120,7 +74,7 @@ class MemPool(LoggedClass):
Remove transactions that are no longer in our mempool. Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool. Request new transactions we don't have then add to our mempool.
''' '''
hex_hashes = set(await self.bp.daemon.mempool_hashes()) hex_hashes = set(await self.daemon.mempool_hashes())
touched = set() touched = set()
missing_utxos = [] missing_utxos = []
@ -145,7 +99,7 @@ class MemPool(LoggedClass):
# ones the daemon no longer has (it will return None). Put # ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx. # them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs) hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes) raw_txs = await self.daemon.getrawtransactions(hex_hashes)
if initial: if initial:
self.logger.info('analysing {:,d} mempool txs' self.logger.info('analysing {:,d} mempool txs'
.format(len(raw_txs))) .format(len(raw_txs)))
@ -155,8 +109,8 @@ class MemPool(LoggedClass):
# The mempool is unordered, so process all outputs first so # The mempool is unordered, so process all outputs first so
# that looking for inputs has full info. # that looking for inputs has full info.
script_hash168 = self.bp.coin.hash168_from_script() script_hash168 = self.coin.hash168_from_script()
db_utxo_lookup = self.bp.db_utxo_lookup db_utxo_lookup = self.db.db_utxo_lookup
def txout_pair(txout): def txout_pair(txout):
return (script_hash168(txout.pk_script), txout.value) return (script_hash168(txout.pk_script), txout.value)
@ -195,7 +149,7 @@ class MemPool(LoggedClass):
try: try:
infos = (txin_info(txin) for txin in tx.inputs) infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos) txin_pairs, unconfs = zip(*infos)
except self.bp.MissingUTXOError: except self.db.MissingUTXOError:
# Drop this TX. If other mempool txs depend on it # Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed # it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no # they'll either be cleaned up or the UTXOs will no
@ -227,7 +181,7 @@ class MemPool(LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses' self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s))) .format(len(self.txs), len(self.hash168s)))
self.bp.notify(touched) self.maanger.notify(touched)
def transactions(self, hash168): def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -254,14 +208,19 @@ class MemPool(LoggedClass):
return value return value
class ServerManager(LoggedClass): class ServerManager(util.LoggedClass):
'''Manages the servers.''' '''Manages the client servers, a mempool, and a block processor.
Servers are started immediately the block processor first catches
up with the daemon.
'''
MgrTask = namedtuple('MgrTask', 'session task') MgrTask = namedtuple('MgrTask', 'session task')
def __init__(self, bp, env): def __init__(self, env):
super().__init__() super().__init__()
self.bp = bp self.bp = BlockProcessor(self, env)
self.mempool = MemPool(self.db.daemon, env.coin, self.bp, self)
self.env = env self.env = env
self.servers = [] self.servers = []
self.irc = IRC(env) self.irc = IRC(env)
@ -269,11 +228,27 @@ class ServerManager(LoggedClass):
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.subscription_count = 0 self.subscription_count = 0
self.irc_future = None self.irc_future = None
self.mempool_future = None
self.logger.info('max subscriptions across all sessions: {:,d}' self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs)) .format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}' self.logger.info('max subscriptions per session: {:,d}'
.format(env.max_session_subs)) .format(env.max_session_subs))
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
@ -315,35 +290,40 @@ class ServerManager(LoggedClass):
else: else:
self.logger.info('IRC disabled') self.logger.info('IRC disabled')
def notify(self, height, touched): async def first_caught_up(self):
if not self.mempool_future:
self.mempool_future = self.mempool.start()
await self.server_mgr.start_servers()
def notify(self, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
cache = {} cache = {}
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON # Use a tuple to distinguish from JSON
session.jobs.put_nowait((height, touched, cache)) session.jobs.put_nowait((self.bp.height, touched, cache))
def stop(self): async def shutdown(self):
'''Close listening servers.''' '''Call to shutdown the servers. Returns when done.'''
for server in self.servers: for server in self.servers:
server.close() server.close()
for server in self.servers:
await server.wait_closed()
self.servers = []
if self.irc_future: if self.irc_future:
self.irc_future.cancel() self.irc_future.cancel()
if self.mempool_future:
self.mempool_future.cancel()
if self.sessions: if self.sessions:
await self.close_sessions()
async def close_sessions(self, secs=60):
self.logger.info('cleanly closing client sessions, please wait...') self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions: for session in self.sessions:
self.close_session(session) self.close_session(session)
async def wait_shutdown(self):
# Wait for servers to close
for server in self.servers:
await server.wait_closed()
self.servers = []
secs = 60
self.logger.info('server listening sockets closed, waiting ' self.logger.info('server listening sockets closed, waiting '
'{:d} seconds for socket cleanup'.format(secs)) '{:d} seconds for socket cleanup'.format(secs))
limit = time.time() + secs limit = time.time() + secs
while self.sessions and time.time() < limit: while self.sessions and time.time() < limit:
await asyncio.sleep(4) await asyncio.sleep(4)
@ -628,7 +608,7 @@ class ElectrumX(Session):
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.async_get_history(hash168) history = await self.async_get_history(hash168)
mempool = self.bp.mempool_transactions(hash168) mempool = self.manager.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history) for tx_hash, height in history)
@ -666,7 +646,7 @@ class ElectrumX(Session):
def unconfirmed_history(self, hash168): def unconfirmed_history(self, hash168):
# Note unconfirmed history is unordered in electrum-server # Note unconfirmed history is unordered in electrum-server
# Height is -1 if unconfirmed txins, otherwise 0 # Height is -1 if unconfirmed txins, otherwise 0
mempool = self.bp.mempool_transactions(hash168) mempool = self.manager.mempool_transactions(hash168)
return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee}
for tx_hash, fee, unconfirmed in mempool] for tx_hash, fee, unconfirmed in mempool]
@ -707,7 +687,7 @@ class ElectrumX(Session):
async def get_balance(self, hash168): async def get_balance(self, hash168):
utxos = await self.get_utxos(hash168) utxos = await self.get_utxos(hash168)
confirmed = sum(utxo.value for utxo in utxos) confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = self.bp.mempool_value(hash168) unconfirmed = self.manager.mempool_value(hash168)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed} return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def list_unspent(self, hash168): async def list_unspent(self, hash168):

Loading…
Cancel
Save