Browse Source

Implement peer subscriptions

Incremental updates are passed.
Also implement a server.features RPC
master
Neil Booth 8 years ago
parent
commit
e56f188816
  1. 2
      docs/ENVIRONMENT.rst
  2. 11
      server/controller.py
  3. 48
      server/peers.py
  4. 35
      server/session.py

2
docs/ENVIRONMENT.rst

@ -89,7 +89,7 @@ These environment variables are optional:
* **SSL_PORT** * **SSL_PORT**
If set ElectrumX will serve SSL clients on **HOST**:**SSL_PORT**. If set ElectrumX will serve SSL clients on **HOST**:**SSL_PORT**.
If set SSL_CERTFILE and SSL_KEYFILE must be defined and be If set then SSL_CERTFILE and SSL_KEYFILE must be defined and be
filesystem paths to those SSL files. filesystem paths to those SSL files.
* **RPC_PORT** * **RPC_PORT**

11
server/controller.py

@ -91,12 +91,10 @@ class Controller(util.LoggedClass):
('server', ('server',
'banner donation_address'), 'banner donation_address'),
] ]
handlers = {'.'.join([prefix, suffix]): self.electrumx_handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_')) getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs for prefix, suffixes in rpcs
for suffix in suffixes.split()} for suffix in suffixes.split()}
handlers['server.peers.subscribe'] = self.peers.subscribe
self.electrumx_handlers = handlers
async def mempool_transactions(self, hashX): async def mempool_transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -358,6 +356,11 @@ class Controller(util.LoggedClass):
for session in sessions: for session in sessions:
await session.notify(self.bp.db_height, touched) await session.notify(self.bp.db_height, touched)
def notify_peers(self, updates):
'''Notify of peer updates.'''
for session in self.sessions:
session.notify_peers(updates)
def electrum_header(self, height): def electrum_header(self, height):
'''Return the binary header at the given height.''' '''Return the binary header at the given height.'''
if not 0 <= height <= self.bp.db_height: if not 0 <= height <= self.bp.db_height:
@ -605,7 +608,7 @@ class Controller(util.LoggedClass):
def rpc_peers(self): def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.''' '''Return a list of server peers, currently taken from IRC.'''
return self.peers.peer_list() return self.peers.peer_dict()
def rpc_reorg(self, count=3): def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks. '''Force a reorg of the given number of blocks.

48
server/peers.py

@ -7,6 +7,8 @@
'''Peer management.''' '''Peer management.'''
import asyncio
import itertools
import socket import socket
from collections import namedtuple from collections import namedtuple
@ -32,21 +34,28 @@ class PeerManager(util.LoggedClass):
self.env = env self.env = env
self.controller = controller self.controller = controller
self.irc = IRC(env, self) self.irc = IRC(env, self)
self.identities = [] self.pruning = None
self._identities = []
# Keyed by nick # Keyed by nick
self.irc_peers = {} self.irc_peers = {}
self.updated_nicks = set()
# We can have a Tor identity inaddition to a normal one # We can have a Tor identity inaddition to a normal one
self.identities.append(NetIdentity(env.report_host, self._identities.append(self.identity(env.report_host,
env.report_tcp_port, env.report_tcp_port,
env.report_ssl_port, env.report_ssl_port,
'')) ''))
if env.report_host_tor.endswith('.onion'): if env.report_host_tor.endswith('.onion'):
self.identities.append(NetIdentity(env.report_host_tor, self._identities.append(self.identity(env.report_host_tor,
env.report_tcp_port_tor, env.report_tcp_port_tor,
env.report_ssl_port_tor, env.report_ssl_port_tor,
'_tor')) '_tor'))
@classmethod
def identity(self, host, tcp_port, ssl_port, suffix):
'''Returns a NetIdentity object. Unpublished ports are None.'''
return NetIdentity(host, tcp_port or None, ssl_port or None, suffix)
@classmethod @classmethod
def real_name(cls, identity): def real_name(cls, identity):
'''Real name as used on IRC.''' '''Real name as used on IRC.'''
@ -62,19 +71,29 @@ class PeerManager(util.LoggedClass):
ssl = port_text('s', identity.ssl_port) ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl) return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def start_irc(self): def identities(self):
'''Start up the IRC connections if enabled.''' '''Return a list of network identities of this server.'''
return self._identities
async def refresh_peer_subs(self):
for n in itertools.count():
await asyncio.sleep(60)
updates = [self.irc_peers[nick] for nick in self.updated_nicks
if nick in self.irc_peers]
if updates:
self.controller.notify_peers(updates)
self.updated_nicks.clear()
async def main_loop(self):
'''Not a loop for now...'''
self.controller.ensure_future(self.refresh_peer_subs())
if self.env.irc: if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix) name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self.identities] for identity in self._identities]
self.controller.ensure_future(self.irc.start(name_pairs)) self.controller.ensure_future(self.irc.start(name_pairs))
else: else:
self.logger.info('IRC is disabled') self.logger.info('IRC is disabled')
async def main_loop(self):
'''Main loop. No loop for now.'''
self.start_irc()
def dns_lookup_peer(self, nick, hostname, details): def dns_lookup_peer(self, nick, hostname, details):
try: try:
ip_addr = None ip_addr = None
@ -83,6 +102,7 @@ class PeerManager(util.LoggedClass):
except socket.error: except socket.error:
pass # IPv6? pass # IPv6?
ip_addr = ip_addr or hostname ip_addr = ip_addr or hostname
self.updated_nicks.add(nick)
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})' self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details)) .format(nick, hostname, details))
@ -102,11 +122,9 @@ class PeerManager(util.LoggedClass):
def count(self): def count(self):
return len(self.irc_peers) return len(self.irc_peers)
def peer_list(self): def peer_dict(self):
return self.irc_peers return self.irc_peers
def subscribe(self): def peer_list(self):
'''Returns the server peers as a list of (ip, host, details) tuples. '''Returns the server peers as a list of (ip, host, details) tuples.'''
Despite the name this is not currently treated as a subscription.'''
return list(self.irc_peers.values()) return list(self.irc_peers.values())

35
server/session.py

@ -7,7 +7,6 @@
'''Classes for local RPC server and remote client TCP/SSL servers.''' '''Classes for local RPC server and remote client TCP/SSL servers.'''
import time import time
import traceback import traceback
from functools import partial from functools import partial
@ -105,6 +104,7 @@ class ElectrumX(SessionBase):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_height = False self.subscribe_height = False
self.subscribe_peers = False
self.notified_height = None self.notified_height = None
self.max_send = self.env.max_send self.max_send = self.env.max_send
self.max_subs = self.env.max_session_subs self.max_subs = self.env.max_session_subs
@ -114,6 +114,8 @@ class ElectrumX(SessionBase):
'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.headers.subscribe': self.headers_subscribe,
'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe,
'blockchain.transaction.broadcast': self.transaction_broadcast, 'blockchain.transaction.broadcast': self.transaction_broadcast,
'server.features': self.server_features,
'server.peers.subscribe': self.peers_subscribe,
'server.version': self.server_version, 'server.version': self.server_version,
} }
@ -167,6 +169,23 @@ class ElectrumX(SessionBase):
self.subscribe_height = True self.subscribe_height = True
return self.height() return self.height()
def peers_subscribe(self, incremental=False):
'''Returns the server peers as a list of (ip, host, details) tuples.
If incremental is False there is no subscription. If True the
remote side will receive notifications of new or modified
peers (peers that disappeared are not notified).
'''
self.subscribe_peers = incremental
return self.controller.peers.peer_list()
def notify_peers(self, updates):
'''Notify of peer updates. Updates are sent as a list in the same
format as the subscription reply, as the first parameter.
'''
if self.subscribe_peers:
self.send_notification('server.peers.subscribe', [updates])
async def address_subscribe(self, address): async def address_subscribe(self, address):
'''Subscribe to an address. '''Subscribe to an address.
@ -180,6 +199,20 @@ class ElectrumX(SessionBase):
self.hashX_subs[hashX] = address self.hashX_subs[hashX] = address
return status return status
def server_features(self):
'''Returns a dictionary of server features.'''
peers = self.controller.peers
hosts = {identity.host: {
'tcp_port': identity.tcp_port,
'ssl_port': identity.ssl_port,
'pruning': peers.pruning,
'version': peers.VERSION,
} for identity in self.controller.peers.identities()}
return {
'hosts': hosts,
}
def server_version(self, client_name=None, protocol_version=None): def server_version(self, client_name=None, protocol_version=None):
'''Returns the server version as a string. '''Returns the server version as a string.

Loading…
Cancel
Save