Browse Source

Merge branch 'release-0.7.5'

master 0.7.5
Neil Booth 8 years ago
parent
commit
35367c84d0
  1. 14
      docs/RELEASE-NOTES
  2. 4
      electrumx_server.py
  3. 8
      lib/coins.py
  4. 5
      lib/jsonrpc.py
  5. 56
      server/block_processor.py
  6. 33
      server/irc.py
  7. 179
      server/protocol.py
  8. 2
      server/version.py

14
docs/RELEASE-NOTES

@ -1,3 +1,17 @@
version 0.7.5
-------------
- refactoring of server manager and event handling. One side effect
is to fix a bug in 0.7.4 where after a reorg ElectrumX might create
a second mempool and/or kick off more servers. Your testing would
be appreciated. This is part of the refactoring necessary to
process incoming blocks asynchronously so client connections are not
left waiting for several seconds
- close connections on first bad JSON encoding. Do not process buffered
requests of a closing connection
- make IRC params a function of the coin (TheLazier) and supply them for
Dash
version 0.7.4
-------------

4
electrumx_server.py

@ -17,7 +17,7 @@ import traceback
from functools import partial
from server.env import Env
from server.protocol import BlockServer
from server.protocol import ServerManager
SUPPRESS_MESSAGES = [
'Fatal read error on socket transport',
@ -45,7 +45,7 @@ def main_loop():
'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context)
server = BlockServer(Env())
server = ServerManager(Env())
future = asyncio.ensure_future(server.main_loop())
# Install signal handlers

8
lib/coins.py

@ -39,6 +39,11 @@ class Coin(object):
VALUE_PER_COIN = 100000000
CHUNK_SIZE=2016
STRANGE_VERBYTE = 0xff
# IRC Defaults
IRC_PREFIX = "E_"
IRC_CHANNEL = "#electrum"
IRC_SERVER = "irc.freenode.net"
IRC_PORT = 6667
@classmethod
def lookup_coin_class(cls, name, net):
@ -359,6 +364,8 @@ class Dash(Coin):
TX_COUNT = 2157510
TX_PER_BLOCK = 4
DEFAULT_RPC_PORT = 9998
IRC_PREFIX = "D_"
IRC_CHANNEL = "#electrum-dash"
@classmethod
def header_hashes(cls, header):
@ -381,3 +388,4 @@ class DashTestnet(Dash):
TX_COUNT = 132681
TX_PER_BLOCK = 1
DEFAULT_RPC_PORT = 19998
IRC_PREFIX = "d_"

5
lib/jsonrpc.py

@ -97,6 +97,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
decode_message for handling.
'''
self.recv_size += len(data)
if self.transport.is_closing():
return
while True:
npos = data.find(ord('\n'))
if npos == -1:
@ -118,6 +121,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except UnicodeDecodeError as e:
msg = 'cannot decode binary bytes: {}'.format(e)
self.send_json_error(msg, self.PARSE_ERROR)
self.transport.close()
return
try:
@ -125,6 +129,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except json.JSONDecodeError as e:
msg = 'cannot decode JSON: {}'.format(e)
self.send_json_error(msg, self.PARSE_ERROR)
self.transport.close()
return
self.on_json_request(message)

56
server/block_processor.py

@ -133,9 +133,11 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env):
def __init__(self, client, env):
super().__init__(env)
self.client = client
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
@ -145,8 +147,8 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self.event = asyncio.Event()
self.touched = set()
self.futures = []
# Meta
self.utxo_MB = env.utxo_MB
@ -181,37 +183,24 @@ class BlockProcessor(server.db.DB):
self.clean_db()
async def main_loop(self):
'''Main loop for block processing.
Safely flushes the DB on clean shutdown.
'''
self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop()))
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating chain reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg)
'''Main loop for block processing.'''
try:
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg)
while True:
await self._wait_for_update()
except asyncio.CancelledError:
self.on_cancel()
await self.wait_shutdown()
def on_cancel(self):
'''Called when the main loop is cancelled.
pass
Intended to be overridden in derived classes.'''
for future in self.futures:
future.cancel()
async def shutdown(self):
'''Shut down the DB cleanly.'''
self.logger.info('flushing state to DB for clean shutdown...')
self.flush(True)
async def wait_shutdown(self):
'''Wait for shutdown to complete cleanly, and return.'''
await asyncio.sleep(0)
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks.
@ -219,7 +208,7 @@ class BlockProcessor(server.db.DB):
'''
blocks = await self.prefetcher.get_blocks()
if not blocks:
await self.first_caught_up()
self.first_caught_up()
return
'''Strip the unspendable genesis coinbase.'''
@ -237,26 +226,21 @@ class BlockProcessor(server.db.DB):
# Flush everything as queries are performed on the DB and
# not in-memory.
self.flush(True)
self.notify(self.touched)
self.client.notify(self.touched)
elif time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60
self.touched = set()
async def first_caught_up(self):
'''Called after each deamon poll if caught up.'''
def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.'''
self.caught_up = True
if self.first_sync:
self.first_sync = False
self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version))
self.flush(True)
def notify(self, touched):
'''Called with list of touched addresses by new blocks.
Only called for blocks found after first_caught_up is called.
Intended to be overridden in derived classes.'''
self.event.set()
async def handle_chain_reorg(self, count):
'''Handle a chain reorganisation.

33
server/irc.py

@ -30,7 +30,6 @@ def port_text(letter, port, default):
class IRC(LoggedClass):
PEER_REGEXP = re.compile('(E_[^!]*)!')
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception):
@ -45,14 +44,26 @@ class IRC(LoggedClass):
version = '1.0'
self.real_name = '{} v{} {} {}'.format(env.report_host, version,
tcp_text, ssl_text)
self.nick = 'E_{}'.format(env.irc_nick if env.irc_nick else
self.prefix = env.coin.IRC_PREFIX
self.nick = '{}{}'.format(self.prefix,
env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode())
[:5].hex())
self.channel = env.coin.IRC_CHANNEL
self.irc_server = env.coin.IRC_SERVER
self.irc_port = env.coin.IRC_PORT
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {}
self.disabled = env.irc is None
async def start(self):
async def start(self, caught_up):
'''Start IRC connections once caught up if enabled in environment.'''
await caught_up.wait()
try:
await self.join()
if self.disabled:
self.logger.info('IRC is disabled')
else:
await self.join()
except asyncio.CancelledError:
pass
except Exception as e:
@ -72,7 +83,7 @@ class IRC(LoggedClass):
while True:
try:
connection = reactor.server()
connection.connect('irc.freenode.net', 6667,
connection.connect(self.irc_server, self.irc_port,
self.nick, ircname=self.real_name)
connection.set_keepalive(60)
while True:
@ -89,8 +100,8 @@ class IRC(LoggedClass):
.format(event.type, event.source, event.arguments))
def on_welcome(self, connection, event):
'''Called when we connect to freenode.'''
connection.join('#electrum')
'''Called when we connect to irc server.'''
connection.join(self.channel)
def on_disconnect(self, connection, event):
'''Called if we are disconnected.'''
@ -99,20 +110,20 @@ class IRC(LoggedClass):
def on_join(self, connection, event):
'''Called when someone new connects to our channel, including us.'''
match = self.PEER_REGEXP.match(event.source)
match = self.peer_regexp.match(event.source)
if match:
connection.who(match.group(1))
def on_quit(self, connection, event):
'''Called when someone leaves our channel.'''
match = self.PEER_REGEXP.match(event.source)
match = self.peer_regexp.match(event.source)
if match:
self.peers.pop(match.group(1), None)
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.PEER_REGEXP.match(event.arguments[0])
match = self.peer_regexp.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
@ -123,7 +134,7 @@ class IRC(LoggedClass):
The users are space-separated in the 2nd argument.
'''
for peer in event.arguments[2].split():
if peer.startswith("E_"):
if peer.startswith(self.prefix):
connection.who(peer)
def on_whoreply(self, connection, event):

179
server/protocol.py

@ -20,63 +20,14 @@ from functools import partial
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC, json_notification_payload
from lib.tx import Deserializer
from lib.util import LoggedClass
import lib.util as util
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from server.irc import IRC
from server.version import VERSION
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also has a mempool and a server manager.
Servers are started immediately the block processor first catches
up with the daemon.
'''
def __init__(self, env):
super().__init__(env)
self.server_mgr = ServerManager(self, env)
self.mempool = MemPool(self)
async def first_caught_up(self):
# Call the base class to flush and log first
await super().first_caught_up()
await self.server_mgr.start_servers()
self.futures.append(self.mempool.start())
def notify(self, touched):
'''Called when addresses are touched by new blocks or mempool
updates.'''
self.server_mgr.notify(self.height, touched)
def on_cancel(self):
'''Called when the main loop is cancelled.'''
self.server_mgr.stop()
super().on_cancel()
async def wait_shutdown(self):
'''Wait for shutdown to complete cleanly, and return.'''
await self.server_mgr.wait_shutdown()
await super().wait_shutdown()
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)
class MemPool(LoggedClass):
class MemPool(util.LoggedClass):
'''Representation of the daemon's mempool.
Updated regularly in caught-up state. Goal is to enable efficient
@ -91,19 +42,21 @@ class MemPool(LoggedClass):
tx's txins are unconfirmed. tx hashes are hex strings.
'''
def __init__(self, bp):
def __init__(self, daemon, coin, db, manager):
super().__init__()
self.daemon = daemon
self.coin = coin
self.db = db
self.manager = manager
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
self.bp = bp
self.count = -1
def start(self):
'''Starts the mempool synchronization mainloop. Return a future.'''
return asyncio.ensure_future(self.main_loop())
async def main_loop(self, caught_up):
'''Asynchronously maintain mempool status with daemon.
async def main_loop(self):
'''Asynchronously maintain mempool status with daemon.'''
Waits until the caught up event is signalled.'''
await caught_up.wait()
self.logger.info('maintaining state with daemon...')
while True:
try:
@ -120,7 +73,7 @@ class MemPool(LoggedClass):
Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool.
'''
hex_hashes = set(await self.bp.daemon.mempool_hashes())
hex_hashes = set(await self.daemon.mempool_hashes())
touched = set()
missing_utxos = []
@ -145,7 +98,7 @@ class MemPool(LoggedClass):
# ones the daemon no longer has (it will return None). Put
# them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
raw_txs = await self.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('analysing {:,d} mempool txs'
.format(len(raw_txs)))
@ -155,8 +108,8 @@ class MemPool(LoggedClass):
# The mempool is unordered, so process all outputs first so
# that looking for inputs has full info.
script_hash168 = self.bp.coin.hash168_from_script()
db_utxo_lookup = self.bp.db_utxo_lookup
script_hash168 = self.coin.hash168_from_script()
db_utxo_lookup = self.db.db_utxo_lookup
def txout_pair(txout):
return (script_hash168(txout.pk_script), txout.value)
@ -195,7 +148,7 @@ class MemPool(LoggedClass):
try:
infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos)
except self.bp.MissingUTXOError:
except self.db.MissingUTXOError:
# Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no
@ -227,7 +180,7 @@ class MemPool(LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
self.bp.notify(touched)
self.manager.notify(touched)
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -254,26 +207,64 @@ class MemPool(LoggedClass):
return value
class ServerManager(LoggedClass):
'''Manages the servers.'''
class ServerManager(util.LoggedClass):
'''Manages the client servers, a mempool, and a block processor.
Servers are started immediately the block processor first catches
up with the daemon.
'''
MgrTask = namedtuple('MgrTask', 'session task')
def __init__(self, bp, env):
def __init__(self, env):
super().__init__()
self.bp = bp
self.bp = BlockProcessor(self, env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self)
self.irc = IRC(env)
self.env = env
self.servers = []
self.irc = IRC(env)
self.sessions = {}
self.max_subs = env.max_subs
self.subscription_count = 0
self.irc_future = None
self.futures = []
self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}'
.format(env.max_session_subs))
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168.
unconfirmed is True if any txin is unconfirmed.
'''
return self.mempool.transactions(hash168)
def mempool_value(self, hash168):
'''Return the unconfirmed amount in the mempool for hash168.
Can be positive or negative.
'''
return self.mempool.value(hash168)
async def main_loop(self):
'''Server manager main loop.'''
def add_future(coro):
self.futures.append(asyncio.ensure_future(coro))
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(self.mempool.main_loop(self.bp.event))
add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event))
for future in asyncio.as_completed(self.futures):
try:
await future # Note: future is not one of self.futures
except asyncio.CancelledError:
break
await self.shutdown()
async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop()
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
@ -290,12 +281,14 @@ class ServerManager(LoggedClass):
self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port))
async def start_servers(self):
async def start_servers(self, caught_up):
'''Connect to IRC and start listening for incoming connections.
Only connect to IRC if enabled. Start listening on RCP, TCP
and SSL ports only if the port wasn pecified.
and SSL ports only if the port wasn't pecified. Waits for the
caught_up event to be signalled.
'''
await caught_up.wait()
env = self.env
if env.rpc_port is not None:
@ -310,40 +303,34 @@ class ServerManager(LoggedClass):
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
if env.irc:
self.irc_future = asyncio.ensure_future(self.irc.start())
else:
self.logger.info('IRC disabled')
def notify(self, height, touched):
def notify(self, touched):
'''Notify sessions about height changes and touched addresses.'''
cache = {}
for session in self.sessions:
if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON
session.jobs.put_nowait((height, touched, cache))
session.jobs.put_nowait((self.bp.height, touched, cache))
def stop(self):
'''Close listening servers.'''
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
for future in self.futures:
future.cancel()
for server in self.servers:
server.close()
if self.irc_future:
self.irc_future.cancel()
await server.wait_closed()
self.servers = [] # So add_session closes new sessions
while not all(future.done() for future in self.futures):
await asyncio.sleep(0)
if self.sessions:
self.logger.info('cleanly closing client sessions, please wait...')
await self.close_sessions()
await self.bp.shutdown()
async def close_sessions(self, secs=60):
self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions:
self.close_session(session)
async def wait_shutdown(self):
# Wait for servers to close
for server in self.servers:
await server.wait_closed()
self.servers = []
secs = 60
self.logger.info('server listening sockets closed, waiting '
'{:d} seconds for socket cleanup'.format(secs))
limit = time.time() + secs
while self.sessions and time.time() < limit:
await asyncio.sleep(4)
@ -628,7 +615,7 @@ class ElectrumX(Session):
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.async_get_history(hash168)
mempool = self.bp.mempool_transactions(hash168)
mempool = self.manager.mempool_transactions(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
@ -666,7 +653,7 @@ class ElectrumX(Session):
def unconfirmed_history(self, hash168):
# Note unconfirmed history is unordered in electrum-server
# Height is -1 if unconfirmed txins, otherwise 0
mempool = self.bp.mempool_transactions(hash168)
mempool = self.manager.mempool_transactions(hash168)
return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee}
for tx_hash, fee, unconfirmed in mempool]
@ -707,7 +694,7 @@ class ElectrumX(Session):
async def get_balance(self, hash168):
utxos = await self.get_utxos(hash168)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = self.bp.mempool_value(hash168)
unconfirmed = self.manager.mempool_value(hash168)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def list_unspent(self, hash168):

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.7.4"
VERSION = "ElectrumX 0.7.5"

Loading…
Cancel
Save