Browse Source

Merge branch 'master' into devel

patch-2
Neil Booth 7 years ago
parent
commit
2803ef913e
  1. 3
      docs/changelog.rst
  2. 3
      electrumx/lib/server_base.py
  3. 12
      electrumx/server/chain_state.py
  4. 4
      electrumx/server/controller.py
  5. 46
      electrumx/server/peers.py
  6. 31
      electrumx/server/session.py
  7. 4
      electrumx_rpc

3
docs/changelog.rst

@ -11,6 +11,8 @@
Version 1.8.1 (in development) Version 1.8.1 (in development)
============================== ==============================
* fix `#557`_
Version 1.8 (06 Aug 2018) Version 1.8 (06 Aug 2018)
========================== ==========================
@ -199,3 +201,4 @@ bitcoincash:qzxpdlt8ehu9ehftw6rqsy2jgfq4nsltxvhrdmdfpn
.. _#523: https://github.com/kyuupichan/electrumx/issues/523 .. _#523: https://github.com/kyuupichan/electrumx/issues/523
.. _#534: https://github.com/kyuupichan/electrumx/issues/534 .. _#534: https://github.com/kyuupichan/electrumx/issues/534
.. _#538: https://github.com/kyuupichan/electrumx/issues/538 .. _#538: https://github.com/kyuupichan/electrumx/issues/538
.. _#557: https://github.com/kyuupichan/electrumx/issues/557

3
electrumx/lib/server_base.py

@ -29,6 +29,7 @@ class ServerBase(object):
''' '''
SUPPRESS_MESSAGE_REGEX = re.compile('SSH handshake') SUPPRESS_MESSAGE_REGEX = re.compile('SSH handshake')
SUPPRESS_TASK_REGEX = re.compile('accept_connection2')
PYTHON_MIN_VERSION = (3, 6) PYTHON_MIN_VERSION = (3, 6)
def __init__(self, env): def __init__(self, env):
@ -68,6 +69,8 @@ class ServerBase(object):
message = context.get('message') message = context.get('message')
if message and self.SUPPRESS_MESSAGE_REGEX.match(message): if message and self.SUPPRESS_MESSAGE_REGEX.match(message):
return return
if self.SUPPRESS_TASK_REGEX.match(repr(context.get('task'))):
return
loop.default_exception_handler(context) loop.default_exception_handler(context)
async def _main(self, loop): async def _main(self, loop):

12
electrumx/server/chain_state.py

@ -6,8 +6,6 @@
# and warranty status of this software. # and warranty status of this software.
import asyncio
from electrumx.lib.hash import hash_to_hex_str from electrumx.lib.hash import hash_to_hex_str
@ -72,13 +70,9 @@ class ChainState(object):
except ValueError: except ValueError:
pass pass
try: hashX = coin.address_to_hashX(arg)
hashX = coin.address_to_hashX(arg) lines.append(f'Address: {arg}')
lines.append(f'Address: {arg}') return hashX
return hashX
except Base58Error:
print(f'Ingoring unknown arg: {arg}')
return None
for arg in args: for arg in args:
hashX = arg_to_hashX(arg) hashX = arg_to_hashX(arg)

4
electrumx/server/controller.py

@ -80,8 +80,8 @@ class Controller(ServerBase):
'''Start the RPC server and wait for the mempool to synchronize. Then '''Start the RPC server and wait for the mempool to synchronize. Then
start serving external clients. start serving external clients.
''' '''
if not (0, 6, 2) <= aiorpcx_version < (0, 7): if not (0, 7) <= aiorpcx_version < (0, 8):
raise RuntimeError('aiorpcX version 0.6.x with x>=2 required') raise RuntimeError('aiorpcX version 0.7.x required')
env = self.env env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()

46
electrumx/server/peers.py

@ -16,8 +16,8 @@ from collections import defaultdict, Counter
from aiorpcx import (ClientSession, SOCKSProxy, from aiorpcx import (ClientSession, SOCKSProxy,
Notification, handler_invocation, Notification, handler_invocation,
SOCKSError, RPCError, TaskTimeout, SOCKSError, RPCError, TaskTimeout, TaskGroup, Event,
TaskGroup, ignore_after, timeout_after) sleep, run_in_thread, ignore_after, timeout_after)
from electrumx.lib.peer import Peer from electrumx.lib.peer import Peer
from electrumx.lib.util import class_logger, protocol_tuple from electrumx.lib.util import class_logger, protocol_tuple
@ -74,7 +74,7 @@ class PeerManager(object):
self.peers = set() self.peers = set()
self.permit_onion_peer_time = time.time() self.permit_onion_peer_time = time.time()
self.proxy = None self.proxy = None
self.task_group = None self.group = TaskGroup()
def _my_clearnet_peer(self): def _my_clearnet_peer(self):
'''Returns the clearnet peer representing this server, if any.''' '''Returns the clearnet peer representing this server, if any.'''
@ -150,7 +150,7 @@ class PeerManager(object):
self.logger.info(f'detected {proxy}') self.logger.info(f'detected {proxy}')
return return
self.logger.info('no proxy detected, will try later') self.logger.info('no proxy detected, will try later')
await asyncio.sleep(900) await sleep(900)
async def _note_peers(self, peers, limit=2, check_ports=False, async def _note_peers(self, peers, limit=2, check_ports=False,
source=None): source=None):
@ -178,9 +178,9 @@ class PeerManager(object):
use_peers = new_peers use_peers = new_peers
for peer in use_peers: for peer in use_peers:
self.logger.info(f'accepted new peer {peer} from {source}') self.logger.info(f'accepted new peer {peer} from {source}')
peer.retry_event = asyncio.Event() peer.retry_event = Event()
self.peers.add(peer) self.peers.add(peer)
await self.task_group.spawn(self._monitor_peer(peer)) await self.group.spawn(self._monitor_peer(peer))
async def _monitor_peer(self, peer): async def _monitor_peer(self, peer):
# Stop monitoring if we were dropped (a duplicate peer) # Stop monitoring if we were dropped (a duplicate peer)
@ -231,16 +231,14 @@ class PeerManager(object):
is_good = True is_good = True
break break
except BadPeerError as e: except BadPeerError as e:
self.logger.error(f'{peer_text} marking bad: ({e!r})') self.logger.error(f'{peer_text} marking bad: ({e})')
peer.mark_bad() peer.mark_bad()
break break
except RPCError as e: except RPCError as e:
self.logger.error(f'{peer_text} RPC error: {e.message} ' self.logger.error(f'{peer_text} RPC error: {e.message} '
f'({e.code})') f'({e.code})')
except TaskTimeout as e: except (OSError, SOCKSError, ConnectionError, TaskTimeout) as e:
self.logger.error(f'{peer_text} timed out after {e.args[0]}s') self.logger.info(f'{peer_text} {e}')
except (OSError, SOCKSError, ConnectionError) as e:
self.logger.info(f'{peer_text} {e!r}')
if is_good: if is_good:
now = time.time() now = time.time()
@ -292,10 +290,15 @@ class PeerManager(object):
peer.features['server_version'] = server_version peer.features['server_version'] = server_version
ptuple = protocol_tuple(protocol_version) ptuple = protocol_tuple(protocol_version)
# FIXME: make these concurrent with first exception preserved # Do the rest concurrently
await self._send_headers_subscribe(session, peer, ptuple) async with TaskGroup() as group:
await self._send_server_features(session, peer) await group.spawn(self._send_headers_subscribe(session, peer,
await self._send_peers_subscribe(session, peer) ptuple))
await group.spawn(self._send_server_features(session, peer))
await group.spawn(self._send_peers_subscribe(session, peer))
# If any task errors out; bail
async for task in group:
task.result()
async def _send_headers_subscribe(self, session, peer, ptuple): async def _send_headers_subscribe(self, session, peer, ptuple):
message = 'blockchain.headers.subscribe' message = 'blockchain.headers.subscribe'
@ -374,7 +377,7 @@ class PeerManager(object):
# #
# External interface # External interface
# #
async def discover_peers(self, task_group): async def discover_peers(self):
'''Perform peer maintenance. This includes '''Perform peer maintenance. This includes
1) Forgetting unreachable peers. 1) Forgetting unreachable peers.
@ -387,9 +390,14 @@ class PeerManager(object):
self.logger.info(f'beginning peer discovery. Force use of ' self.logger.info(f'beginning peer discovery. Force use of '
f'proxy: {self.env.force_proxy}') f'proxy: {self.env.force_proxy}')
self.task_group = task_group forever = Event()
await task_group.spawn(self._detect_proxy()) async with self.group as group:
await task_group.spawn(self._import_peers()) await group.spawn(forever.wait())
await group.spawn(self._detect_proxy())
await group.spawn(self._import_peers())
# Consume tasks as they complete
async for task in group:
task.result()
def info(self): def info(self):
'''The number of peers.''' '''The number of peers.'''

31
electrumx/server/session.py

@ -20,15 +20,15 @@ from collections import defaultdict
from functools import partial from functools import partial
from aiorpcx import ( from aiorpcx import (
ServerSession, JSONRPCAutoDetect, TaskGroup, handler_invocation, ServerSession, JSONRPCAutoDetect, JSONRPCConnection,
RPCError, Request, ignore_after TaskGroup, handler_invocation, RPCError, Request, ignore_after
) )
import electrumx import electrumx
import electrumx.lib.text as text import electrumx.lib.text as text
import electrumx.lib.util as util import electrumx.lib.util as util
from electrumx.lib.hash import (sha256, hash_to_hex_str, hex_str_to_hash, from electrumx.lib.hash import (sha256, hash_to_hex_str, hex_str_to_hash,
HASHX_LEN) HASHX_LEN, Base58Error)
from electrumx.lib.peer import Peer from electrumx.lib.peer import Peer
from electrumx.server.daemon import DaemonError from electrumx.server.daemon import DaemonError
from electrumx.server.peers import PeerManager from electrumx.server.peers import PeerManager
@ -259,7 +259,7 @@ class SessionManager(object):
# Give the sockets some time to close gracefully # Give the sockets some time to close gracefully
async with TaskGroup() as group: async with TaskGroup() as group:
for session in stale_sessions: for session in stale_sessions:
await group.spawn(session.close(force_after=30)) await group.spawn(session.close())
# Consolidate small groups # Consolidate small groups
bw_limit = self.env.bandwidth_limit bw_limit = self.env.bandwidth_limit
@ -391,7 +391,10 @@ class SessionManager(object):
async def rpc_query(self, items, limit): async def rpc_query(self, items, limit):
'''Return a list of data about server peers.''' '''Return a list of data about server peers.'''
return await self.chain_state.query(items, limit) try:
return await self.chain_state.query(items, limit)
except Base58Error as e:
raise RPCError(BAD_REQUEST, e.args[0]) from None
async def rpc_sessions(self): async def rpc_sessions(self):
'''Return statistics about connected sessions.''' '''Return statistics about connected sessions.'''
@ -434,8 +437,8 @@ class SessionManager(object):
await self._start_external_servers() await self._start_external_servers()
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
async with TaskGroup(wait=object) as group: async with TaskGroup() as group:
await group.spawn(self.peer_mgr.discover_peers(group)) await group.spawn(self.peer_mgr.discover_peers())
await group.spawn(self._clear_stale_sessions()) await group.spawn(self._clear_stale_sessions())
await group.spawn(self._log_sessions()) await group.spawn(self._log_sessions())
await group.spawn(self._restart_if_paused()) await group.spawn(self._restart_if_paused())
@ -445,7 +448,7 @@ class SessionManager(object):
await self._close_servers(list(self.servers.keys())) await self._close_servers(list(self.servers.keys()))
async with TaskGroup() as group: async with TaskGroup() as group:
for session in list(self.sessions): for session in list(self.sessions):
await group.spawn(session.close(force_after=0.1)) await group.spawn(session.close(force_after=1))
def session_count(self): def session_count(self):
'''The number of connections that we've sent something to.''' '''The number of connections that we've sent something to.'''
@ -516,7 +519,8 @@ class SessionBase(ServerSession):
session_counter = itertools.count() session_counter = itertools.count()
def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind): def __init__(self, session_mgr, chain_state, mempool, peer_mgr, kind):
super().__init__(protocol=JSONRPCAutoDetect) connection = JSONRPCConnection(JSONRPCAutoDetect)
super().__init__(connection=connection)
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.session_mgr = session_mgr self.session_mgr = session_mgr
self.chain_state = chain_state self.chain_state = chain_state
@ -620,7 +624,7 @@ class ElectrumX(SessionBase):
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_headers_raw = False self.subscribe_headers_raw = False
self.notified_height = None self.notified_height = None
self.connection._max_response_size = self.env.max_send self.connection.max_response_size = self.env.max_send
self.max_subs = self.env.max_session_subs self.max_subs = self.env.max_session_subs
self.hashX_subs = {} self.hashX_subs = {}
self.sv_seen = False self.sv_seen = False
@ -658,13 +662,6 @@ class ElectrumX(SessionBase):
def protocol_version_string(self): def protocol_version_string(self):
return util.version_string(self.protocol_tuple) return util.version_string(self.protocol_tuple)
# FIXME: make this the aiorpcx API for version 0.7
async def close(self, force_after=30):
'''Close the connection and return when closed.'''
async with ignore_after(force_after):
await super().close()
self.abort()
async def daemon_request(self, method, *args): async def daemon_request(self, method, *args):
'''Catch a DaemonError and convert it to an RPCError.''' '''Catch a DaemonError and convert it to an RPCError.'''
try: try:

4
electrumx_rpc

@ -113,7 +113,7 @@ def main():
# aiorpcX makes this so easy... # aiorpcX makes this so easy...
async def send_request(): async def send_request():
try: try:
async with timeout_after(1): async with timeout_after(15):
async with ClientSession('localhost', port) as session: async with ClientSession('localhost', port) as session:
result = await session.send_request(method, args) result = await session.send_request(method, args)
if method in ('query', ): if method in ('query', ):
@ -128,8 +128,6 @@ def main():
except OSError: except OSError:
print('cannot connect - is ElectrumX catching up, not running, or ' print('cannot connect - is ElectrumX catching up, not running, or '
f'is {port} the wrong RPC port?') f'is {port} the wrong RPC port?')
except TaskTimeout as e:
print(f'request timed out after {e.args[0]}s')
except Exception as e: except Exception as e:
print(f'error making request: {e!r}') print(f'error making request: {e!r}')

Loading…
Cancel
Save