Browse Source

Merge branch 'release-0.10.5'

master 0.10.5
Neil Booth 8 years ago
parent
commit
a50888c932
  1. 11
      README.rst
  2. 14
      docs/HOWTO.rst
  3. 16
      docs/RPC-INTERFACE.rst
  4. 9
      electrumx_server.py
  5. 10
      server/block_processor.py
  6. 122
      server/controller.py
  7. 6
      server/daemon.py
  8. 5
      server/irc.py
  9. 2
      server/version.py

11
README.rst

@ -124,9 +124,6 @@ Roadmap Post-1.0
- Python 3.6, which has several performance improvements relevant to - Python 3.6, which has several performance improvements relevant to
ElectrumX ElectrumX
- UTXO root logic and implementation - UTXO root logic and implementation
- improve DB abstraction so LMDB is not penalized
- investigate effects of cache defaults and DB configuration defaults
on sync time and simplify / optimize the default config accordingly
- potentially move some functionality to C or C++ - potentially move some functionality to C or C++
@ -140,6 +137,13 @@ version prior to the release of 1.0.
ChangeLog ChangeLog
========= =========
Version 0.10.5
--------------
* fix for second part of issue `#100`_ where the ElectrumX was not
killable if bitcoind was unavailable
Version 0.10.4 Version 0.10.4
-------------- --------------
@ -254,6 +258,7 @@ Version 0.9.17
.. _#93: https://github.com/kyuupichan/electrumx/issues/93 .. _#93: https://github.com/kyuupichan/electrumx/issues/93
.. _#94: https://github.com/kyuupichan/electrumx/issues/94 .. _#94: https://github.com/kyuupichan/electrumx/issues/94
.. _#99: https://github.com/kyuupichan/electrumx/issues/99 .. _#99: https://github.com/kyuupichan/electrumx/issues/99
.. _#100: https://github.com/kyuupichan/electrumx/issues/100
.. _#101: https://github.com/kyuupichan/electrumx/issues/101 .. _#101: https://github.com/kyuupichan/electrumx/issues/101
.. _#102: https://github.com/kyuupichan/electrumx/issues/102 .. _#102: https://github.com/kyuupichan/electrumx/issues/102
.. _#103: https://github.com/kyuupichan/electrumx/issues/103 .. _#103: https://github.com/kyuupichan/electrumx/issues/103

14
docs/HOWTO.rst

@ -45,16 +45,15 @@ recommend having at least 30-40GB of free space before starting.
Database Engine Database Engine
=============== ===============
You can choose from RocksDB, LevelDB or LMDB to store transaction You can choose from LevelDB and RocksDB to store transaction
information on disk. Currently, the fastest seems to be RocksDB with information on disk. The time taken and DB size is not significantly
LevelDB being slightly slower. LMDB is slowest but that is because the different. We tried to support LMDB but its history write performance
code needs reworking to be better usable with LMDB. was much worse.
You will need to install one of: You will need to install one of:
+ `plyvel <https://plyvel.readthedocs.io/en/latest/installation.html>`_ for LevelDB + `plyvel <https://plyvel.readthedocs.io/en/latest/installation.html>`_ for LevelDB
+ `pyrocksdb <http://pyrocksdb.readthedocs.io/en/v0.4/installation.html>`_ for RocksDB + `pyrocksdb <http://pyrocksdb.readthedocs.io/en/v0.4/installation.html>`_ for RocksDB
+ `lmdb <https://lmdb.readthedocs.io/en/release/#installation-unix>`_ for LMDB
Running Running
======= =======
@ -234,8 +233,9 @@ Terminating ElectrumX
===================== =====================
The preferred way to terminate the server process is to send it the The preferred way to terminate the server process is to send it the
INT or TERM signals. For a daemontools supervised process this is best **stop** RPC command, or alternatively on Unix the INT or TERM
done by bringing it down like so:: signals. For a daemontools supervised process this can be done by
bringing it down like so::
svc -d ~/service/electrumx svc -d ~/service/electrumx

16
docs/RPC-INTERFACE.rst

@ -2,14 +2,26 @@ The ElectrumX RPC Interface
=========================== ===========================
You can query the status of a running server, and affect its behaviour You can query the status of a running server, and affect its behaviour
using the RPC interface. by sending JSON RPC commands to the LocalRPC port it is listening on.
This is best done using the electrumx_rpc.py script provided.
The general form of invocation is: The general form of invocation is:
``electrumx_rpc.py <command> [arg1 [arg2...]`` ``electrumx_rpc.py [-p PORT] <command> [arg1 [arg2...]``
The port to send the commands to can be specified on the command line,
otherwise it is taken from the environment variable **RPC_PORT**, or
8000 is used if that is not set.
The following commands are available: The following commands are available:
* **stop**
Flush all cached data to disk and shut down the server cleanly, as
if sending the KILL signal. Be patient - during initial sync
flushing all cached data to disk can take several minutes. This
command takes no arguments.
* **getinfo** * **getinfo**
Returns a summary of server state. This command takes no arguments. Returns a summary of server state. This command takes no arguments.

9
electrumx_server.py

@ -36,8 +36,9 @@ def main_loop():
def on_signal(signame): def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.''' '''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, shutting down'.format(signame)) logging.warning('received {} signal, initiating shutdown'
future.cancel() .format(signame))
controller.initiate_shutdown()
def on_exception(loop, context): def on_exception(loop, context):
'''Suppress spurious messages it appears we cannot control.''' '''Suppress spurious messages it appears we cannot control.'''
@ -47,8 +48,8 @@ def main_loop():
'accept_connection2()' in repr(context.get('task'))): 'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context) loop.default_exception_handler(context)
server = Controller(Env()) controller = Controller(Env())
future = asyncio.ensure_future(server.main_loop()) future = asyncio.ensure_future(controller.main_loop())
# Install signal handlers # Install signal handlers
for signame in ('SIGINT', 'SIGTERM'): for signame in ('SIGINT', 'SIGTERM'):

10
server/block_processor.py

@ -15,7 +15,7 @@ import time
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from server.daemon import Daemon, DaemonError from server.daemon import DaemonError
from server.version import VERSION from server.version import VERSION
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.util import chunks, formatted_time, LoggedClass from lib.util import chunks, formatted_time, LoggedClass
@ -138,8 +138,9 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env): def __init__(self, env, daemon):
super().__init__(env) super().__init__(env)
self.daemon = daemon
# These are our state as we move ahead of DB state # These are our state as we move ahead of DB state
self.fs_height = self.db_height self.fs_height = self.db_height
@ -148,7 +149,6 @@ class BlockProcessor(server.db.DB):
self.tip = self.db_tip self.tip = self.db_tip
self.tx_count = self.db_tx_count self.tx_count = self.db_tx_count
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up_event = asyncio.Event() self.caught_up_event = asyncio.Event()
self.task_queue = asyncio.Queue() self.task_queue = asyncio.Queue()
self.stop = False self.stop = False
@ -193,8 +193,10 @@ class BlockProcessor(server.db.DB):
'''Called by the controller to shut processing down.''' '''Called by the controller to shut processing down.'''
async def do_nothing(): async def do_nothing():
pass pass
self.logger.info('preparing clean shutdown')
self.stop = True self.stop = True
self.add_task(do_nothing) # Ensure something is on the queue # Ensure something is on the queue so main_loop notices self.stop
self.add_task(do_nothing)
async def main_loop(self): async def main_loop(self):
'''Main loop for block processing.''' '''Main loop for block processing.'''

122
server/controller.py

@ -9,7 +9,6 @@ import asyncio
import codecs import codecs
import json import json
import os import os
import _socket
import ssl import ssl
import time import time
from bisect import bisect_left from bisect import bisect_left
@ -22,6 +21,7 @@ from lib.jsonrpc import JSONRPC, RPCError, RequestBase
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
import lib.util as util import lib.util as util
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.daemon import Daemon
from server.irc import IRC from server.irc import IRC
from server.session import LocalRPC, ElectrumX from server.session import LocalRPC, ElectrumX
from server.mempool import MemPool from server.mempool import MemPool
@ -50,11 +50,13 @@ class Controller(util.LoggedClass):
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
# Set this event to cleanly shutdown
self.shutdown_event = asyncio.Event()
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.start = time.time() self.start = time.time()
self.coin = env.coin self.coin = env.coin
self.bp = BlockProcessor(env) self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.daemon = self.bp.daemon self.bp = BlockProcessor(env, self.daemon)
self.mempool = MemPool(self.bp) self.mempool = MemPool(self.bp)
self.irc = IRC(env) self.irc = IRC(env)
self.env = env self.env = env
@ -76,11 +78,11 @@ class Controller(util.LoggedClass):
self.delayed_sessions = [] self.delayed_sessions = []
self.next_queue_id = 0 self.next_queue_id = 0
self.cache_height = 0 self.cache_height = 0
self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.setup_bands() self.setup_bands()
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = 'disconnect getinfo groups log peers reorg sessions'.split() cmds = ('disconnect getinfo groups log peers reorg sessions stop'
.split())
self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds}
# Set up the ElectrumX request handlers # Set up the ElectrumX request handlers
rpcs = [ rpcs = [
@ -189,37 +191,75 @@ class Controller(util.LoggedClass):
if session in self.sessions: if session in self.sessions:
await session.serve_requests() await session.serve_requests()
def initiate_shutdown(self):
'''Call this function to start the shutdown process.'''
self.shutdown_event.set()
async def main_loop(self): async def main_loop(self):
'''Controller main loop.''' '''Controller main loop.'''
def add_future(coro): def add_future(coro):
self.futures.append(asyncio.ensure_future(coro)) futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first async def await_bp_catchup():
add_future(self.bp.main_loop()) '''Wait for the block processor to catch up.
When it has, start the servers and connect to IRC.
'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
add_future(self.irc.start())
add_future(self.start_servers())
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
add_future(self.notify())
for n in range(4):
add_future(self.serve_requests())
bp_future = asyncio.ensure_future(self.bp.main_loop())
futures = []
add_future(self.bp.prefetcher.main_loop()) add_future(self.bp.prefetcher.main_loop())
add_future(self.irc.start(self.bp.caught_up_event)) add_future(await_bp_catchup())
add_future(self.start_servers(self.bp.caught_up_event))
add_future(self.mempool.main_loop()) # Perform a clean shutdown when this event is signalled.
add_future(self.enqueue_delayed_sessions()) await self.shutdown_event.wait()
add_future(self.notify()) self.logger.info('shutting down gracefully')
for n in range(4): self.state = self.SHUTTING_DOWN
add_future(self.serve_requests())
# First tell the block processor to shut down, it may need to
for future in asyncio.as_completed(self.futures): # perform a lengthy flush. Then shut down the rest.
try: self.bp.on_shutdown()
await future # Note: future is not one of self.futures self.close_servers(list(self.servers.keys()))
except asyncio.CancelledError: for future in futures:
break future.cancel()
await self.shutdown() # Now wait for the cleanup to complete
await self.close_sessions()
if not bp_future.done():
self.logger.info('waiting for block processor')
await bp_future
def close_servers(self, kinds): def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
self.logger.info('closing down {} listening servers'
.format(', '.join(kinds)))
for kind in kinds: for kind in kinds:
server = self.servers.pop(kind, None) server = self.servers.pop(kind, None)
if server: if server:
server.close() server.close()
# Don't bother awaiting the close - we're not async
async def close_sessions(self, secs=30):
if not self.sessions:
return
self.logger.info('waiting up to {:d} seconds for socket cleanup'
.format(secs))
for session in self.sessions:
self.close_session(session)
limit = time.time() + secs
while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
@ -236,12 +276,10 @@ class Controller(util.LoggedClass):
self.logger.info('{} server listening on {}:{:d}' self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port)) .format(kind, host, port))
async def start_servers(self, caught_up): async def start_servers(self):
'''Start RPC, TCP and SSL servers once caught up.''' '''Start RPC, TCP and SSL servers once caught up.'''
if self.env.rpc_port is not None: if self.env.rpc_port is not None:
await self.start_server('RPC', 'localhost', self.env.rpc_port) await self.start_server('RPC', 'localhost', self.env.rpc_port)
await caught_up.wait()
_socket.setdefaulttimeout(5)
self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
.format(self.env.session_timeout)) .format(self.env.session_timeout))
@ -312,31 +350,6 @@ class Controller(util.LoggedClass):
self.header_cache[height] = header self.header_cache[height] = header
return header return header
async def shutdown(self):
'''Call to shutdown everything. Returns when done.'''
self.state = self.SHUTTING_DOWN
self.bp.on_shutdown()
self.close_servers(list(self.servers.keys()))
# Don't cancel the block processor main loop - let it close itself
for future in self.futures[1:]:
future.cancel()
if self.sessions:
await self.close_sessions()
await self.futures[0]
async def close_sessions(self, secs=30):
self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions:
self.close_session(session)
self.logger.info('listening sockets closed, waiting up to '
'{:d} seconds for socket cleanup'.format(secs))
limit = time.time() + secs
while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
def add_session(self, session): def add_session(self, session):
now = time.time() now = time.time()
if now > self.next_stale_check: if now > self.next_stale_check:
@ -559,6 +572,11 @@ class Controller(util.LoggedClass):
''' '''
return self.for_each_session(session_ids, self.toggle_logging) return self.for_each_session(session_ids, self.toggle_logging)
async def rpc_stop(self):
'''Shut down the server cleanly.'''
self.initiate_shutdown()
return 'stopping'
async def rpc_getinfo(self): async def rpc_getinfo(self):
'''Return summary information about the server process.''' '''Return summary information about the server process.'''
return self.server_summary() return self.server_summary()

6
server/daemon.py

@ -10,6 +10,7 @@ daemon.'''
import asyncio import asyncio
import json import json
import traceback
import aiohttp import aiohttp
@ -82,11 +83,14 @@ class Daemon(util.LoggedClass):
except aiohttp.ClientConnectionError: except aiohttp.ClientConnectionError:
log_error('connection problem - is your daemon running?') log_error('connection problem - is your daemon running?')
except self.DaemonWarmingUpError: except self.DaemonWarmingUpError:
log_error('still starting up checking blocks.') log_error('starting up checking blocks.')
except (asyncio.CancelledError, DaemonError): except (asyncio.CancelledError, DaemonError):
raise raise
except Exception as e: except Exception as e:
self.log_error(traceback.format_exc())
self.log_error('response was: {}'.format(resp))
log_error('request gave unexpected error: {}.'.format(e)) log_error('request gave unexpected error: {}.'.format(e))
if secs >= max_secs and len(self.urls) > 1: if secs >= max_secs and len(self.urls) > 1:
self.url_index = (self.url_index + 1) % len(self.urls) self.url_index = (self.url_index + 1) % len(self.urls)
logged_url = self.logged_url(self.urls[self.url_index]) logged_url = self.logged_url(self.urls[self.url_index])

5
server/irc.py

@ -55,9 +55,8 @@ class IRC(LoggedClass):
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {} self.peers = {}
async def start(self, caught_up): async def start(self):
'''Start IRC connections once caught up if enabled in environment.''' '''Start IRC connections if enabled in environment.'''
await caught_up.wait()
try: try:
if self.env.irc: if self.env.irc:
await self.join() await self.join()

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.10.4" VERSION = "ElectrumX 0.10.5"

Loading…
Cancel
Save