Browse Source

Merge branch 'develop'

master
Neil Booth 8 years ago
parent
commit
961e0e2a1a
  1. 7
      README.rst
  2. 165
      docs/PEER_DISCOVERY.rst
  3. 27
      lib/coins.py
  4. 29
      lib/jsonrpc.py
  5. 17
      lib/util.py
  6. 45
      server/controller.py
  7. 11
      server/db.py
  8. 34
      server/env.py
  9. 75
      server/peers.py
  10. 44
      server/session.py
  11. 2
      server/version.py

7
README.rst

@ -135,6 +135,12 @@ version prior to the release of 1.0.
ChangeLog
=========
Version 0.10.14
---------------
* misc cleanups and code changes to prepare for peer discovery in 0.11.0
* add `docs/PEER_DISCOVERY.rst`_
Version 0.10.13
---------------
@ -273,3 +279,4 @@ stability please stick with the 0.9 series.
.. _#102: https://github.com/kyuupichan/electrumx/issues/102
.. _#103: https://github.com/kyuupichan/electrumx/issues/103
.. _docs/HOWTO.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/HOWTO.rst
.. docs/PEER_DISCOVERY.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/PEER_DISCOVERY.rst

165
docs/PEER_DISCOVERY.rst

@ -0,0 +1,165 @@
Peer Discovery
==============
This is a suggestion of a peer discovery prtocol as a way to gradually
move off depending on IRC.
It will be implemented in ElectrumX from version 0.11.0
onwards.
Peer Database
-------------
A persistent store of peers with at least the following information
about a peer so that state persists across server restarts. This
information is required for a response to the **server.peers.subscribe**
RPC call:
* host name
* ip address
* TCP and SSL port numbers
* protocol version
* pruning limit, if any
At present ElectrumX uses a flat file for this DB in the main database
directory. It retains additional per-peer metadata including:
* time of last successful connection
* time of last connection attempt
* count of unsuccessful attempts since last successful one
* source of the information stored about this peer
Default Peers
-------------
This is a list of hard-coded, well-known peers to seed the peer
discovery process if the peer database is empty or corrupt. If the
peer database is available it is not used. Ideally it should hold up
to 10 servers that have shown commitment to reliable service.
In ElectrumX this is a per-coin property in `lib/coins.py`.
Response to server.peers.subscribe RPC call
-------------------------------------------
This RPC call is used by Electrum clients to get a list of peer
servers, in preference to a hard-coded list of peer servers in the
client, which it will fall back to if necessary.
The response should only include peers it has successfully connected
to recently. If Tor routing is not available to the server, so it
cannot connect to them, it should return a random limited number of
onion peers it is aware of.
In ElectrumX, "recently" is taken to be the last 24 hours, and it will
serve up to 3 onion peers if onion routing is not available.
Maintaining the Peer Database
-----------------------------
In order to keep its peer database up-to-date and fresh, if some time
has passed since the last successful connection to a peer, an Electrum
server should make an attempt to connect, choosing the TCP or SSL port
at random if both are available. On connecting it should issue
**server.peers.subscribe** and **server.features** RPC calls to
collect information about the server and its peers, and issue a
**server.add_peer** call to advertise itself. Once this is done and
replies received it should terminate the connection.
The peer database should prefer information obtained from the peer
itself over information obtained from any other source.
If a connection attempt fails, reconnection should follow some kind of
exponential backoff. If a long period of time has elapsed since the
successful connection attempt, the peer entry should be removed from
the database.
ElectrumX will choose the SSL port most of the time if both ports are
available. It tries to reconnect to each peer once every 24 hours and
drops peers if two weeks have passed since a successful connection.
server.features RPC call
------------------------
This is a new RPC call that a server can use to advertise what
services and features it offers. It is intended for use by Electrum
clients as well as peer servers. In the case of servers it is useful
in order to have database peer information sourced from the peer
itself.
The call takes no arguments and returns a dictionary keyed by feature
name whose value gives details about the feature where appropriate.
If a key is missing the feature is presumed not to be offered.
Currently ElectrumX understands and returns the following keys:
* **hosts**
An dictionary, keyed by host name, that this server can be reached
at. Normally this will only have a single entry; other entries can
be used in case there are other connection routes (e.g. ToR).
The value for a host is itself a dictionary, with the following
optional keys:
* **ssl_port**
An integer. Omit or set to *null* if SSL connectivity is not
provided.
* **tcp_port**
An integer. Omit or set to *null* if TCP connectivity is not
provided.
A server should ignore information provided about any host other
than the one it connected to.
* **server_version**
A string that identifies the server software. Should be the same as
the response to **server.version** RPC call.
* **protocol_version**
A string that is the Electrum protcol version. Should be the same
as what would suffix the letter 'v' in the IRC real name.
* **pruning**
An integer, the pruning limit. Omit or set to *null* if there is no
pruning limit. Should be the same as what would suffix the letter
'p' in the IRC real name.
server.add_peer RPC call
------------------------
This call is intended for a new server to get itself in the connected
set.
It takes a single parameter (named **features** if JSON RPCv2 named
parameters are being used) which contains the same information as the
**server.features** RPC call would return.
A server receiving a **server.add_peer** call should not replace
existing information about the host(s) given, but instead schedule a
separate connection to verify the information for itself.
To prevent abuse a server may want to ignore excessive calls to this
function.
IRC
---
Other server implementations may not have implemented the peer
discovery protocol yet. Whilst we transition away from IRC, in order
to keep these servers in the connected peer set, software implementing
this protocol should provide a way to occasionally connect to IRC to
pick up stragglers only advertising themselves there.

27
lib/coins.py

@ -40,6 +40,9 @@ class Coin(object):
IRC_SERVER = "irc.freenode.net"
IRC_PORT = 6667
HASHX_LEN = 11
# Peer discovery
PEER_DEFAULT_PORTS = {'t':'50001', 's':'50002'}
PEERS = []
@classmethod
def lookup_coin_class(cls, name, net):
@ -274,6 +277,21 @@ class Bitcoin(Coin):
IRC_PREFIX = "E_"
IRC_CHANNEL = "#electrum"
RPC_PORT = 8332
PEERS = [
'4cii7ryno5j3axe4.onion t'
'btc.smsys.me s995',
'ca6ulp2j2mpsft3y.onion s t',
'electrum.be s t',
'electrum.trouth.net s t',
'electrum.vom-stausee.de s t',
'electrum3.hachre.de s t',
'Electrum.hsmiths.com s t',
'erbium1.sytes.net s t',
'h.1209k.com s t',
'helicarrier.bauerj.eu s t',
'ozahtqwp25chjdjd.onion s t',
'us11.einfachmalnettsein.de s t',
]
class BitcoinTestnet(Bitcoin):
@ -292,7 +310,14 @@ class BitcoinTestnet(Bitcoin):
TX_PER_BLOCK = 21
IRC_PREFIX = "ET_"
RPC_PORT = 18332
PEER_DEFAULT_PORTS = {'t':'51001', 's':'51002'}
PEERS = [
'electrum.akinbo.org s t',
'he36kyperp3kbuxu.onion s t',
'electrum-btc-testnet.petrkr.net s t',
'h.hsmiths.com t53011 s53012',
'testnet.not.fyi s t',
]
class BitcoinTestnetSegWit(BitcoinTestnet):
'''Bitcoin Testnet for Core bitcoind >= 0.13.1.

29
lib/jsonrpc.py

@ -324,12 +324,15 @@ class JSONSessionBase(util.LoggedClass):
Flag the connection to close on a fatal error or too many errors.'''
version = self.version
self.error_count += 1
if code in (version.PARSE_ERROR, version.INVALID_REQUEST):
self.log_info(message)
self.close_after_send = True
elif self.error_count >= 10:
self.log_info('too many errors, last: {}'.format(message))
self.close_after_send = True
if not self.close_after_send:
fatal_log = None
if code in (version.PARSE_ERROR, version.INVALID_REQUEST):
fatal_log = message
elif self.error_count >= 10:
fatal_log = 'too many errors, last: {}'.format(message)
if fatal_log:
self.log_info(fatal_log)
self.close_after_send = True
return self.encode_payload(self.version.error_payload
(message, code, id_))
@ -449,6 +452,8 @@ class JSONSessionBase(util.LoggedClass):
return self.response_bytes(result, payload['id'])
except RPCError as e:
return self.error_bytes(e.msg, e.code, self.payload_id(payload))
except asyncio.CancelledError:
raise
except Exception:
self.log_error(traceback.format_exc())
return self.error_bytes('internal error processing request',
@ -698,3 +703,15 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
def send_bytes(self, binary):
'''Send JSON text over the transport.'''
self.transport.writelines((binary, b'\n'))
def peer_addr(self, anon=True):
'''Return the peer address and port.'''
peer_info = self.peer_info()
if not peer_info:
return 'unknown'
if anon:
return 'xx.xx.xx.xx:xx'
if ':' in peer_info[0]:
return '[{}]:{}'.format(peer_info[0], peer_info[1])
else:
return '{}:{}'.format(peer_info[0], peer_info[1])

17
lib/util.py

@ -181,11 +181,16 @@ class LogicalFile(object):
'''
file_num, offset = divmod(start, self.file_size)
filename = self.filename_fmt.format(file_num)
try:
f= open(filename, 'rb+')
except FileNotFoundError:
if not create:
raise
f = open(filename, 'wb+')
f = open_file(filename, create)
f.seek(offset)
return f
def open_file(filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise

45
server/controller.py

@ -53,7 +53,7 @@ class Controller(util.LoggedClass):
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.bp = BlockProcessor(env, self, self.daemon)
self.mempool = MemPool(self.bp, self)
self.peers = PeerManager(env, self)
self.peer_mgr = PeerManager(env, self)
self.env = env
self.servers = {}
# Map of session to the key of its list in self.groups
@ -65,7 +65,7 @@ class Controller(util.LoggedClass):
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
self.futures = set()
self.futures = {}
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0
@ -163,7 +163,7 @@ class Controller(util.LoggedClass):
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
await asyncio.sleep(1)
@ -208,28 +208,31 @@ class Controller(util.LoggedClass):
'''Schedule running func in the executor, return a task.'''
return self.ensure_future(self.run_in_executor(func, *args))
def ensure_future(self, coro):
def ensure_future(self, coro, callback=None):
'''Schedule the coro to be run.'''
future = asyncio.ensure_future(coro)
future.add_done_callback(self.on_future_done)
self.futures.add(future)
self.futures[future] = callback
return future
def on_future_done(self, future):
'''Collect the result of a future after removing it from our set.'''
self.futures.remove(future)
try:
future.result()
except asyncio.CancelledError:
pass
except Exception:
self.log_error(traceback.format_exc())
callback = self.futures.pop(future)
if callback:
callback(future)
else:
try:
future.result()
except asyncio.CancelledError:
pass
except Exception:
self.log_error(traceback.format_exc())
async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
self.ensure_future(self.peers.main_loop())
self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers())
self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions())
@ -444,7 +447,7 @@ class Controller(util.LoggedClass):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
def server_summary(self):
def getinfo(self):
'''A one-line summary of server state.'''
return {
'daemon_height': self.daemon.cached_height(),
@ -455,7 +458,7 @@ class Controller(util.LoggedClass):
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.pause for s in self.sessions),
'pid': os.getpid(),
'peers': self.peers.count(),
'peers': self.peer_mgr.count(),
'requests': sum(s.count_pending_items() for s in self.sessions),
'sessions': self.session_count(),
'subs': self.sub_count(),
@ -517,7 +520,7 @@ class Controller(util.LoggedClass):
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<6} {:<5} {:>15} {:>5} {:>5} '
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}')
yield fmt.format('ID', 'Flags', 'Client', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer')
@ -596,20 +599,20 @@ class Controller(util.LoggedClass):
def rpc_getinfo(self):
'''Return summary information about the server process.'''
return self.server_summary()
return self.getinfo()
def rpc_groups(self):
'''Return statistics about the session groups.'''
return self.group_data()
def rpc_peers(self):
'''Return a list of data about server peers.'''
return self.peer_mgr.rpc_data()
def rpc_sessions(self):
'''Return statistics about connected sessions.'''
return self.session_data(for_log=False)
def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.'''
return self.peers.peer_dict()
def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.

11
server/db.py

@ -99,7 +99,7 @@ class DB(util.LoggedClass):
self.logger.info('created new database')
self.logger.info('creating metadata diretcory')
os.mkdir('meta')
with self.open_file('COIN', create=True) as f:
with util.open_file('COIN', create=True) as f:
f.write('ElectrumX databases and metadata for {} {}'
.format(self.coin.NAME, self.coin.NET).encode())
else:
@ -183,15 +183,6 @@ class DB(util.LoggedClass):
self.clear_excess_history(self.utxo_flush_count)
self.clear_excess_undo_info()
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.

34
server/env.py

@ -8,12 +8,16 @@
'''Class for handling environment configuration and defaults.'''
from collections import namedtuple
from os import environ
from lib.coins import Coin
from lib.util import LoggedClass
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
class Env(LoggedClass):
'''Wraps environment configuration.'''
@ -55,18 +59,24 @@ class Env(LoggedClass):
# IRC
self.irc = self.default('IRC', False)
self.irc_nick = self.default('IRC_NICK', None)
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
self.report_host = self.default('REPORT_HOST', self.host)
self.report_tcp_port_tor = self.integer('REPORT_TCP_PORT_TOR',
self.report_tcp_port
if self.report_tcp_port else
self.tcp_port)
self.report_ssl_port_tor = self.integer('REPORT_SSL_PORT_TOR',
self.report_ssl_port
if self.report_ssl_port else
self.ssl_port)
self.report_host_tor = self.default('REPORT_HOST_TOR', '')
self.identity = NetIdentity(
self.default('REPORT_HOST', self.host),
self.integer('REPORT_TCP_PORT', self.tcp_port) or None,
self.integer('REPORT_SSL_PORT', self.ssl_port) or None,
''
)
self.tor_identity = NetIdentity(
self.default('REPORT_HOST_TOR', ''), # must be a string
self.integer('REPORT_TCP_PORT_TOR',
self.identity.tcp_port
if self.identity.tcp_port else
self.tcp_port) or None,
self.integer('REPORT_SSL_PORT_TOR',
self.identity.ssl_port
if self.identity.ssl_port else
self.ssl_port) or None,
'_tor'
)
def default(self, envvar, default):
return environ.get(envvar, default)

75
server/peers.py

@ -8,7 +8,6 @@
'''Peer management.'''
import asyncio
import itertools
import socket
from collections import namedtuple
@ -16,7 +15,6 @@ import lib.util as util
from server.irc import IRC
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
IRCPeer = namedtuple('IRCPeer', 'ip_addr host details')
@ -26,8 +24,7 @@ class PeerManager(util.LoggedClass):
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
'''
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
PROTOCOL_VERSION = '1.0'
def __init__(self, env, controller):
super().__init__()
@ -38,59 +35,44 @@ class PeerManager(util.LoggedClass):
self._identities = []
# Keyed by nick
self.irc_peers = {}
self.updated_nicks = set()
# We can have a Tor identity inaddition to a normal one
self._identities.append(self.identity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
if env.report_host_tor.endswith('.onion'):
self._identities.append(self.identity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
@classmethod
def identity(self, host, tcp_port, ssl_port, suffix):
'''Returns a NetIdentity object. Unpublished ports are None.'''
return NetIdentity(host, tcp_port or None, ssl_port or None, suffix)
@classmethod
def real_name(cls, identity):
self._identities.append(env.identity)
if env.tor_identity.host.endswith('.onion'):
self._identities.append(env.tor_identity)
def real_name(self, host, protocol_version, tcp_port, ssl_port):
'''Real name as used on IRC.'''
default_ports = self.env.coin.PEER_DEFAULT_PORTS
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
if port == default_ports.get(letter):
return letter
else:
return ' ' + letter + str(port)
return letter + str(port)
parts = [host, 'v' + protocol_version]
for letter, port in (('s', ssl_port), ('t', tcp_port)):
if port:
parts.append(port_text(letter, port))
return ' '.join(parts)
tcp = port_text('t', identity.tcp_port)
ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def irc_name_pairs(self):
return [(self.real_name(identity.host, self.PROTOCOL_VERSION,
identity.tcp_port, identity.ssl_port),
identity.nick_suffix)
for identity in self._identities]
def identities(self):
'''Return a list of network identities of this server.'''
return self._identities
async def refresh_peer_subs(self):
for n in itertools.count():
await asyncio.sleep(60)
updates = [self.irc_peers[nick] for nick in self.updated_nicks
if nick in self.irc_peers]
if updates:
self.controller.notify_peers(updates)
self.updated_nicks.clear()
def ensure_future(self, coro, callback=None):
'''Schedule the coro to be run.'''
return self.controller.ensure_future(coro, callback=callback)
async def main_loop(self):
'''Not a loop for now...'''
self.controller.ensure_future(self.refresh_peer_subs())
if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self._identities]
self.controller.ensure_future(self.irc.start(name_pairs))
self.ensure_future(self.irc.start(self.irc_name_pairs()))
else:
self.logger.info('IRC is disabled')
@ -102,7 +84,6 @@ class PeerManager(util.LoggedClass):
except socket.error:
pass # IPv6?
ip_addr = ip_addr or hostname
self.updated_nicks.add(nick)
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details))
@ -122,9 +103,9 @@ class PeerManager(util.LoggedClass):
def count(self):
return len(self.irc_peers)
def peer_dict(self):
def rpc_data(self):
return self.irc_peers
def peer_list(self):
def on_peers_subscribe(self):
'''Returns the server peers as a list of (ip, host, details) tuples.'''
return list(self.irc_peers.values())

44
server/session.py

@ -8,7 +8,6 @@
'''Classes for local RPC server and remote client TCP/SSL servers.'''
import time
import traceback
from functools import partial
from lib.jsonrpc import JSONSession, RPCError
@ -52,16 +51,8 @@ class SessionBase(JSONSession):
super().close_connection()
def peername(self, *, for_log=True):
'''Return the peer name of this connection.'''
peer_info = self.peer_info()
if not peer_info:
return 'unknown'
if for_log and self.anon_logs:
return 'xx.xx.xx.xx:xx'
if ':' in peer_info[0]:
return '[{}]:{}'.format(peer_info[0], peer_info[1])
else:
return '{}:{}'.format(peer_info[0], peer_info[1])
'''Return the peer address and port.'''
return self.peer_addr(anon=for_log and self.anon_logs)
def flags(self):
'''Status flags.'''
@ -104,7 +95,6 @@ class ElectrumX(SessionBase):
super().__init__(*args, **kwargs)
self.subscribe_headers = False
self.subscribe_height = False
self.subscribe_peers = False
self.notified_height = None
self.max_send = self.env.max_send
self.max_subs = self.env.max_session_subs
@ -169,22 +159,9 @@ class ElectrumX(SessionBase):
self.subscribe_height = True
return self.height()
def peers_subscribe(self, incremental=False):
'''Returns the server peers as a list of (ip, host, details) tuples.
If incremental is False there is no subscription. If True the
remote side will receive notifications of new or modified
peers (peers that disappeared are not notified).
'''
self.subscribe_peers = incremental
return self.controller.peers.peer_list()
def notify_peers(self, updates):
'''Notify of peer updates. Updates are sent as a list in the same
format as the subscription reply, as the first parameter.
'''
if self.subscribe_peers:
self.send_notification('server.peers.subscribe', [updates])
def peers_subscribe(self):
'''Return the server peers as a list of (ip, host, details) tuples.'''
return self.controller.peer_mgr.on_peers_subscribe()
async def address_subscribe(self, address):
'''Subscribe to an address.
@ -201,16 +178,17 @@ class ElectrumX(SessionBase):
def server_features(self):
'''Returns a dictionary of server features.'''
peers = self.controller.peers
peer_mgr = self.controller.peer_mgr
hosts = {identity.host: {
'tcp_port': identity.tcp_port,
'ssl_port': identity.ssl_port,
'pruning': peers.pruning,
'version': peers.VERSION,
} for identity in self.controller.peers.identities()}
} for identity in peer_mgr.identities()}
return {
'hosts': hosts,
'pruning': peer_mgr.pruning,
'protocol_version': peer_mgr.PROTOCOL_VERSION,
'server_version': VERSION,
}
def server_version(self, client_name=None, protocol_version=None):
@ -220,7 +198,7 @@ class ElectrumX(SessionBase):
protocol_version: the protocol version spoken by the client
'''
if client_name:
self.client = str(client_name)[:15]
self.client = str(client_name)[:17]
if protocol_version is not None:
self.protocol_version = protocol_version
return VERSION

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.10.13"
VERSION = "ElectrumX 0.10.14"

Loading…
Cancel
Save