You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
7.1 KiB

# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
8 years ago
# and warranty status of this software.
'''Server controller.
Coordinates the parts of the server. Serves as a cache for
client-serving data such as histories.
'''
8 years ago
import asyncio
import signal
8 years ago
import ssl
import traceback
8 years ago
from functools import partial
from server.daemon import Daemon, DaemonError
from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC, RPCError, JSONRPC
from lib.hash import (sha256, double_sha256, hash_to_str,
Base58, hex_str_to_hash)
from lib.util import LoggedClass
8 years ago
class Controller(LoggedClass):
8 years ago
def __init__(self, loop, env):
'''Create up the controller.
Creates DB, Daemon and BlockProcessor instances.
'''
super().__init__()
self.loop = loop
8 years ago
self.env = env
self.coin = env.coin
self.daemon = Daemon(env.daemon_url)
8 years ago
self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update)
JSONRPC.init(self.block_processor, self.coin)
self.servers = []
self.addresses = {}
8 years ago
self.jobs = asyncio.Queue()
self.peers = {}
def start(self):
8 years ago
'''Prime the event loop with asynchronous jobs.'''
coros = self.block_processor.coros()
8 years ago
coros.append(self.run_jobs())
for coro in coros:
asyncio.ensure_future(coro)
8 years ago
# Signal handlers
for signame in ('SIGINT', 'SIGTERM'):
8 years ago
self.loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
async def on_update(self, height, touched):
if not self.servers:
self.servers = await self.start_servers()
ElectrumX.notify(height, touched)
8 years ago
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified. Does
nothing if servers are already running.
'''
servers = []
8 years ago
env = self.env
loop = self.loop
protocol = partial(LocalRPC, self)
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, self, self.daemon, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
8 years ago
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
return servers
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
def on_signal(self, signame):
'''Call on receipt of a signal to cleanly shutdown.'''
self.logger.warning('received {} signal, preparing to shut down'
.format(signame))
for task in asyncio.Task.all_tasks(self.loop):
task.cancel()
def add_job(self, coro):
'''Queue a job for asynchronous processing.'''
8 years ago
self.jobs.put_nowait(coro)
8 years ago
async def run_jobs(self):
'''Asynchronously run through the job queue.'''
while True:
8 years ago
job = await self.jobs.get()
try:
await job
except asyncio.CancelledError:
raise
except Exception:
# Getting here should probably be considered a bug and fixed
traceback.print_exc()
def address_status(self, hash168):
'''Returns status as 32 bytes.'''
status = self.addresses.get(hash168)
if status is None:
history = self.block_processor.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
status = sha256(status.encode())
self.addresses[hash168] = status
return status
async def get_merkle(self, tx_hash, height):
'''tx_hash is a hex string.'''
block_hash = await self.daemon.send_single('getblockhash', (height,))
block = await self.daemon.send_single('getblock', (block_hash, True))
tx_hashes = block['tx']
# This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash)
idx = pos
hashes = [hex_str_to_hash(txh) for txh in tx_hashes]
merkle_branch = []
while len(hashes) > 1:
if len(hashes) & 1:
hashes.append(hashes[-1])
idx = idx - 1 if (idx & 1) else idx + 1
merkle_branch.append(hash_to_str(hashes[idx]))
idx //= 2
hashes = [double_sha256(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def get_peers(self):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return self.peers
def height(self):
return self.block_processor.height
def get_history(self, hash168):
history = self.block_processor.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]
def get_chunk(self, index):
'''Return header chunk as hex. Index is a non-negative integer.'''
chunk_size = self.coin.CHUNK_SIZE
next_height = self.height() + 1
start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size)
return self.block_processor.read_headers(start_height, count).hex()
def get_balance(self, hash168):
confirmed = self.block_processor.get_balance(hash168)
unconfirmed = -1 # FIXME
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
def list_unspent(self, hash168):
utxos = self.block_processor.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value}
for utxo in utxos)