Browse Source

Create BlockServer

Controller now an empty shell
master
Neil Booth 8 years ago
parent
commit
2b028cc065
  1. 69
      server/block_processor.py
  2. 56
      server/controller.py

69
server/block_processor.py

@ -11,6 +11,7 @@
import array import array
import ast import ast
import asyncio import asyncio
import ssl
import struct import struct
import time import time
from bisect import bisect_left from bisect import bisect_left
@ -19,6 +20,7 @@ from functools import partial
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError from server.daemon import DaemonError
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.tx import Deserializer from lib.tx import Deserializer
from lib.util import chunks, LoggedClass from lib.util import chunks, LoggedClass
@ -288,13 +290,14 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env, daemon, on_update=None): def __init__(self, env, daemon):
'''on_update is awaitable, and called only when caught up with the '''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.''' daemon and a new block arrives or the mempool is updated.'''
super().__init__(env.coin, env.db_engine) super().__init__(env.coin, env.db_engine)
daemon.debug_set_height(self.height)
self.env = env
self.daemon = daemon self.daemon = daemon
self.on_update = on_update
self.mempool = MemPool(self) self.mempool = MemPool(self)
self.touched = set() self.touched = set()
@ -330,7 +333,6 @@ class BlockProcessor(server.db.DB):
self.clean_db() self.clean_db()
def coros(self): def coros(self):
self.daemon.debug_set_height(self.height)
return [self.start(), self.prefetcher.start()] return [self.start(), self.prefetcher.start()]
async def start(self): async def start(self):
@ -359,6 +361,7 @@ class BlockProcessor(server.db.DB):
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
if caught_up: if caught_up:
await self.caught_up(mempool_hashes) await self.caught_up(mempool_hashes)
self.touched = set()
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg() await self.handle_chain_reorg()
@ -370,10 +373,7 @@ class BlockProcessor(server.db.DB):
if self.first_sync: if self.first_sync:
self.first_sync = False self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height)) self.logger.info('synced to height {:,d}'.format(self.height))
if self.on_update:
self.touched.update(await self.mempool.update(mempool_hashes)) self.touched.update(await self.mempool.update(mempool_hashes))
await self.on_update(self.height, self.touched)
self.touched = set()
async def handle_chain_reorg(self): async def handle_chain_reorg(self):
# First get all state on disk # First get all state on disk
@ -803,3 +803,60 @@ class BlockProcessor(server.db.DB):
Can be positive or negative. Can be positive or negative.
''' '''
return self.mempool.value(hash168) return self.mempool.value(hash168)
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also starts servers when caught up.'''
def __init__(self, env, daemon):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env, daemon)
self.servers = []
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
ElectrumX.notify(self.height, self.touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified.
'''
env = self.env
loop = asyncio.get_event_loop()
JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
self.servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()

56
server/controller.py

@ -12,12 +12,9 @@ client-serving data such as histories.
''' '''
import asyncio import asyncio
import ssl
from functools import partial
from server.daemon import Daemon from server.daemon import Daemon
from server.block_processor import BlockProcessor from server.block_processor import BlockServer
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.util import LoggedClass from lib.util import LoggedClass
@ -33,10 +30,7 @@ class Controller(LoggedClass):
self.env = env self.env = env
self.coin = env.coin self.coin = env.coin
self.daemon = Daemon(env.daemon_url, env.debug) self.daemon = Daemon(env.daemon_url, env.debug)
self.block_processor = BlockProcessor(env, self.daemon, self.block_processor = BlockServer(env, self.daemon)
on_update=self.on_update)
JSONRPC.init(self.block_processor, self.daemon, self.coin)
self.servers = []
def start(self): def start(self):
'''Prime the event loop with asynchronous jobs.''' '''Prime the event loop with asynchronous jobs.'''
@ -45,50 +39,6 @@ class Controller(LoggedClass):
for coro in coros: for coro in coros:
asyncio.ensure_future(coro) asyncio.ensure_future(coro)
async def on_update(self, height, touched):
if not self.servers:
self.servers = await self.start_servers()
ElectrumX.notify(height, touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified. Does
nothing if servers are already running.
'''
servers = []
env = self.env
loop = self.loop
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
return servers
def stop(self): def stop(self):
'''Close the listening servers.''' '''Close the listening servers.'''
for server in self.servers: self.block_processor.stop()
server.close()

Loading…
Cancel
Save