diff --git a/server/block_processor.py b/server/block_processor.py index 1412cbc..c58284a 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -11,6 +11,7 @@ import array import ast import asyncio +import ssl import struct import time from bisect import bisect_left @@ -19,6 +20,7 @@ from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import DaemonError +from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass @@ -288,13 +290,14 @@ class BlockProcessor(server.db.DB): Coordinate backing up in case of chain reorganisations. ''' - def __init__(self, env, daemon, on_update=None): + def __init__(self, env, daemon): '''on_update is awaitable, and called only when caught up with the daemon and a new block arrives or the mempool is updated.''' super().__init__(env.coin, env.db_engine) + daemon.debug_set_height(self.height) + self.env = env self.daemon = daemon - self.on_update = on_update self.mempool = MemPool(self) self.touched = set() @@ -330,7 +333,6 @@ class BlockProcessor(server.db.DB): self.clean_db() def coros(self): - self.daemon.debug_set_height(self.height) return [self.start(), self.prefetcher.start()] async def start(self): @@ -359,6 +361,7 @@ class BlockProcessor(server.db.DB): await asyncio.sleep(0) # Yield if caught_up: await self.caught_up(mempool_hashes) + self.touched = set() except ChainReorg: await self.handle_chain_reorg() @@ -370,10 +373,7 @@ class BlockProcessor(server.db.DB): if self.first_sync: self.first_sync = False self.logger.info('synced to height {:,d}'.format(self.height)) - if self.on_update: - self.touched.update(await self.mempool.update(mempool_hashes)) - await self.on_update(self.height, self.touched) - self.touched = set() + self.touched.update(await self.mempool.update(mempool_hashes)) async def handle_chain_reorg(self): # First get all state on disk @@ -803,3 +803,60 @@ class BlockProcessor(server.db.DB): Can be positive or negative. ''' return self.mempool.value(hash168) + + +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also starts servers when caught up.''' + + def __init__(self, env, daemon): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env, daemon) + self.servers = [] + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.servers: + await self.start_servers() + ElectrumX.notify(self.height, self.touched) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. + ''' + env = self.env + loop = asyncio.get_event_loop() + + JSONRPC.init(self, self.daemon, self.coin) + + protocol = LocalRPC + if env.rpc_port is not None: + host = 'localhost' + rpc_server = loop.create_server(protocol, host, env.rpc_port) + self.servers.append(await rpc_server) + self.logger.info('RPC server listening on {}:{:d}' + .format(host, env.rpc_port)) + + protocol = partial(ElectrumX, env) + if env.tcp_port is not None: + tcp_server = loop.create_server(protocol, env.host, env.tcp_port) + self.servers.append(await tcp_server) + self.logger.info('TCP server listening on {}:{:d}' + .format(env.host, env.tcp_port)) + + if env.ssl_port is not None: + # FIXME: update if we want to require Python >= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() diff --git a/server/controller.py b/server/controller.py index a30aeb4..dffae97 100644 --- a/server/controller.py +++ b/server/controller.py @@ -12,12 +12,9 @@ client-serving data such as histories. ''' import asyncio -import ssl -from functools import partial from server.daemon import Daemon -from server.block_processor import BlockProcessor -from server.protocol import ElectrumX, LocalRPC, JSONRPC +from server.block_processor import BlockServer from lib.util import LoggedClass @@ -33,10 +30,7 @@ class Controller(LoggedClass): self.env = env self.coin = env.coin self.daemon = Daemon(env.daemon_url, env.debug) - self.block_processor = BlockProcessor(env, self.daemon, - on_update=self.on_update) - JSONRPC.init(self.block_processor, self.daemon, self.coin) - self.servers = [] + self.block_processor = BlockServer(env, self.daemon) def start(self): '''Prime the event loop with asynchronous jobs.''' @@ -45,50 +39,6 @@ class Controller(LoggedClass): for coro in coros: asyncio.ensure_future(coro) - async def on_update(self, height, touched): - if not self.servers: - self.servers = await self.start_servers() - ElectrumX.notify(height, touched) - - async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. - - Does not start a server if the port wasn't specified. Does - nothing if servers are already running. - ''' - servers = [] - env = self.env - loop = self.loop - - protocol = LocalRPC - if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) - - protocol = partial(ElectrumX, env) - if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) - - if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - return servers - def stop(self): '''Close the listening servers.''' - for server in self.servers: - server.close() + self.block_processor.stop()