From 5c5e90d5740b19ecf1dc40c3e3ef4653ee11fc8e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 8 Nov 2016 19:20:13 +0900 Subject: [PATCH] Move BlockServer to more appropriate location --- electrumx_server.py | 2 +- server/block_processor.py | 62 +-------------------------------------- server/db.py | 1 + server/protocol.py | 59 +++++++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 62 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index 94d65c8..6a817f8 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -17,7 +17,7 @@ import traceback from functools import partial from server.env import Env -from server.block_processor import BlockServer +from server.protocol import BlockServer def close_loop(loop): diff --git a/server/block_processor.py b/server/block_processor.py index 0309491..23dffcd 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -9,18 +9,15 @@ import array -import ast import asyncio -import ssl import struct import time from bisect import bisect_left -from collections import defaultdict, namedtuple +from collections import defaultdict from functools import partial from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.daemon import Daemon, DaemonError -from server.protocol import ElectrumX, LocalRPC, JSONRPC from lib.hash import hash_to_str from lib.tx import Deserializer from lib.util import chunks, LoggedClass @@ -808,60 +805,3 @@ class BlockProcessor(server.db.DB): Can be positive or negative. ''' return self.mempool.value(hash168) - - -class BlockServer(BlockProcessor): - '''Like BlockProcessor but also starts servers when caught up.''' - - def __init__(self, env): - '''on_update is awaitable, and called only when caught up with the - daemon and a new block arrives or the mempool is updated.''' - super().__init__(env) - self.servers = [] - - async def caught_up(self, mempool_hashes): - await super().caught_up(mempool_hashes) - if not self.servers: - await self.start_servers() - ElectrumX.notify(self.height, self.touched) - - async def start_servers(self): - '''Start listening on RPC, TCP and SSL ports. - - Does not start a server if the port wasn't specified. - ''' - env = self.env - loop = asyncio.get_event_loop() - - JSONRPC.init(self, self.daemon, self.coin) - - protocol = LocalRPC - if env.rpc_port is not None: - host = 'localhost' - rpc_server = loop.create_server(protocol, host, env.rpc_port) - self.servers.append(await rpc_server) - self.logger.info('RPC server listening on {}:{:d}' - .format(host, env.rpc_port)) - - protocol = partial(ElectrumX, env) - if env.tcp_port is not None: - tcp_server = loop.create_server(protocol, env.host, env.tcp_port) - self.servers.append(await tcp_server) - self.logger.info('TCP server listening on {}:{:d}' - .format(env.host, env.tcp_port)) - - if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - ssl_context.load_cert_chain(env.ssl_certfile, - keyfile=env.ssl_keyfile) - ssl_server = loop.create_server(protocol, env.host, env.ssl_port, - ssl=ssl_context) - self.servers.append(await ssl_server) - self.logger.info('SSL server listening on {}:{:d}' - .format(env.host, env.ssl_port)) - - def stop(self): - '''Close the listening servers.''' - for server in self.servers: - server.close() diff --git a/server/db.py b/server/db.py index 0256ff4..9215dbd 100644 --- a/server/db.py +++ b/server/db.py @@ -7,6 +7,7 @@ '''Interface to the blockchain database.''' + import array import ast import os diff --git a/server/protocol.py b/server/protocol.py index 53c9572..0ea78f6 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -11,10 +11,12 @@ import asyncio import codecs import json +import ssl import traceback from collections import namedtuple from functools import partial +from server.block_processor import BlockProcessor from server.daemon import DaemonError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.util import LoggedClass @@ -30,6 +32,63 @@ def json_notification(method, params): return {'id': None, 'method': method, 'params': params} +class BlockServer(BlockProcessor): + '''Like BlockProcessor but also starts servers when caught up.''' + + def __init__(self, env): + '''on_update is awaitable, and called only when caught up with the + daemon and a new block arrives or the mempool is updated.''' + super().__init__(env) + self.servers = [] + + async def caught_up(self, mempool_hashes): + await super().caught_up(mempool_hashes) + if not self.servers: + await self.start_servers() + ElectrumX.notify(self.height, self.touched) + + async def start_servers(self): + '''Start listening on RPC, TCP and SSL ports. + + Does not start a server if the port wasn't specified. + ''' + env = self.env + loop = asyncio.get_event_loop() + + JSONRPC.init(self, self.daemon, self.coin) + + protocol = LocalRPC + if env.rpc_port is not None: + host = 'localhost' + rpc_server = loop.create_server(protocol, host, env.rpc_port) + self.servers.append(await rpc_server) + self.logger.info('RPC server listening on {}:{:d}' + .format(host, env.rpc_port)) + + protocol = partial(ElectrumX, env) + if env.tcp_port is not None: + tcp_server = loop.create_server(protocol, env.host, env.tcp_port) + self.servers.append(await tcp_server) + self.logger.info('TCP server listening on {}:{:d}' + .format(env.host, env.tcp_port)) + + if env.ssl_port is not None: + # FIXME: update if we want to require Python >= 3.5.3 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + ssl_context.load_cert_chain(env.ssl_certfile, + keyfile=env.ssl_keyfile) + ssl_server = loop.create_server(protocol, env.host, env.ssl_port, + ssl=ssl_context) + self.servers.append(await ssl_server) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + def stop(self): + '''Close the listening servers.''' + for server in self.servers: + server.close() + + AsyncTask = namedtuple('AsyncTask', 'session job') class SessionManager(LoggedClass):