Browse Source

Preparations for peer discovery

DEFAULT_PORTS is a now a coin property.
Other miscellaneous updates to get things closer
master
Neil Booth 8 years ago
parent
commit
06c8eda161
  1. 27
      lib/coins.py
  2. 14
      lib/jsonrpc.py
  3. 17
      lib/util.py
  4. 45
      server/controller.py
  5. 11
      server/db.py
  6. 34
      server/env.py
  7. 75
      server/peers.py
  8. 44
      server/session.py

27
lib/coins.py

@ -40,6 +40,9 @@ class Coin(object):
IRC_SERVER = "irc.freenode.net"
IRC_PORT = 6667
HASHX_LEN = 11
# Peer discovery
PEER_DEFAULT_PORTS = {'t':'50001', 's':'50002'}
PEERS = []
@classmethod
def lookup_coin_class(cls, name, net):
@ -274,6 +277,21 @@ class Bitcoin(Coin):
IRC_PREFIX = "E_"
IRC_CHANNEL = "#electrum"
RPC_PORT = 8332
PEERS = [
'4cii7ryno5j3axe4.onion t'
'btc.smsys.me s995',
'ca6ulp2j2mpsft3y.onion s t',
'electrum.be s t',
'electrum.trouth.net s t',
'electrum.vom-stausee.de s t',
'electrum3.hachre.de s t',
'Electrum.hsmiths.com s t',
'erbium1.sytes.net s t',
'h.1209k.com s t',
'helicarrier.bauerj.eu s t',
'ozahtqwp25chjdjd.onion s t',
'us11.einfachmalnettsein.de s t',
]
class BitcoinTestnet(Bitcoin):
@ -292,7 +310,14 @@ class BitcoinTestnet(Bitcoin):
TX_PER_BLOCK = 21
IRC_PREFIX = "ET_"
RPC_PORT = 18332
PEER_DEFAULT_PORTS = {'t':'51001', 's':'51002'}
PEERS = [
'electrum.akinbo.org s t',
'he36kyperp3kbuxu.onion s t',
'electrum-btc-testnet.petrkr.net s t',
'h.hsmiths.com t53011 s53012',
'testnet.not.fyi s t',
]
class BitcoinTestnetSegWit(BitcoinTestnet):
'''Bitcoin Testnet for Core bitcoind >= 0.13.1.

14
lib/jsonrpc.py

@ -452,6 +452,8 @@ class JSONSessionBase(util.LoggedClass):
return self.response_bytes(result, payload['id'])
except RPCError as e:
return self.error_bytes(e.msg, e.code, self.payload_id(payload))
except asyncio.CancelledError:
raise
except Exception:
self.log_error(traceback.format_exc())
return self.error_bytes('internal error processing request',
@ -701,3 +703,15 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
def send_bytes(self, binary):
'''Send JSON text over the transport.'''
self.transport.writelines((binary, b'\n'))
def peer_addr(self, anon=True):
'''Return the peer address and port.'''
peer_info = self.peer_info()
if not peer_info:
return 'unknown'
if anon:
return 'xx.xx.xx.xx:xx'
if ':' in peer_info[0]:
return '[{}]:{}'.format(peer_info[0], peer_info[1])
else:
return '{}:{}'.format(peer_info[0], peer_info[1])

17
lib/util.py

@ -181,11 +181,16 @@ class LogicalFile(object):
'''
file_num, offset = divmod(start, self.file_size)
filename = self.filename_fmt.format(file_num)
try:
f= open(filename, 'rb+')
except FileNotFoundError:
if not create:
raise
f = open(filename, 'wb+')
f = open_file(filename, create)
f.seek(offset)
return f
def open_file(filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise

45
server/controller.py

@ -53,7 +53,7 @@ class Controller(util.LoggedClass):
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.bp = BlockProcessor(env, self, self.daemon)
self.mempool = MemPool(self.bp, self)
self.peers = PeerManager(env, self)
self.peer_mgr = PeerManager(env, self)
self.env = env
self.servers = {}
# Map of session to the key of its list in self.groups
@ -65,7 +65,7 @@ class Controller(util.LoggedClass):
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
self.futures = set()
self.futures = {}
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0
@ -163,7 +163,7 @@ class Controller(util.LoggedClass):
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
await asyncio.sleep(1)
@ -208,28 +208,31 @@ class Controller(util.LoggedClass):
'''Schedule running func in the executor, return a task.'''
return self.ensure_future(self.run_in_executor(func, *args))
def ensure_future(self, coro):
def ensure_future(self, coro, callback=None):
'''Schedule the coro to be run.'''
future = asyncio.ensure_future(coro)
future.add_done_callback(self.on_future_done)
self.futures.add(future)
self.futures[future] = callback
return future
def on_future_done(self, future):
'''Collect the result of a future after removing it from our set.'''
self.futures.remove(future)
try:
future.result()
except asyncio.CancelledError:
pass
except Exception:
self.log_error(traceback.format_exc())
callback = self.futures.pop(future)
if callback:
callback(future)
else:
try:
future.result()
except asyncio.CancelledError:
pass
except Exception:
self.log_error(traceback.format_exc())
async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
self.ensure_future(self.peers.main_loop())
self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers())
self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions())
@ -444,7 +447,7 @@ class Controller(util.LoggedClass):
'''The number of connections that we've sent something to.'''
return len(self.sessions)
def server_summary(self):
def getinfo(self):
'''A one-line summary of server state.'''
return {
'daemon_height': self.daemon.cached_height(),
@ -455,7 +458,7 @@ class Controller(util.LoggedClass):
'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.pause for s in self.sessions),
'pid': os.getpid(),
'peers': self.peers.count(),
'peers': self.peer_mgr.count(),
'requests': sum(s.count_pending_items() for s in self.sessions),
'sessions': self.session_count(),
'subs': self.sub_count(),
@ -517,7 +520,7 @@ class Controller(util.LoggedClass):
return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<6} {:<5} {:>15} {:>5} {:>5} '
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}')
yield fmt.format('ID', 'Flags', 'Client', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer')
@ -596,20 +599,20 @@ class Controller(util.LoggedClass):
def rpc_getinfo(self):
'''Return summary information about the server process.'''
return self.server_summary()
return self.getinfo()
def rpc_groups(self):
'''Return statistics about the session groups.'''
return self.group_data()
def rpc_peers(self):
'''Return a list of data about server peers.'''
return self.peer_mgr.rpc_data()
def rpc_sessions(self):
'''Return statistics about connected sessions.'''
return self.session_data(for_log=False)
def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.'''
return self.peers.peer_dict()
def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.

11
server/db.py

@ -99,7 +99,7 @@ class DB(util.LoggedClass):
self.logger.info('created new database')
self.logger.info('creating metadata diretcory')
os.mkdir('meta')
with self.open_file('COIN', create=True) as f:
with util.open_file('COIN', create=True) as f:
f.write('ElectrumX databases and metadata for {} {}'
.format(self.coin.NAME, self.coin.NET).encode())
else:
@ -183,15 +183,6 @@ class DB(util.LoggedClass):
self.clear_excess_history(self.utxo_flush_count)
self.clear_excess_undo_info()
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.

34
server/env.py

@ -8,12 +8,16 @@
'''Class for handling environment configuration and defaults.'''
from collections import namedtuple
from os import environ
from lib.coins import Coin
from lib.util import LoggedClass
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
class Env(LoggedClass):
'''Wraps environment configuration.'''
@ -55,18 +59,24 @@ class Env(LoggedClass):
# IRC
self.irc = self.default('IRC', False)
self.irc_nick = self.default('IRC_NICK', None)
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)
self.report_host = self.default('REPORT_HOST', self.host)
self.report_tcp_port_tor = self.integer('REPORT_TCP_PORT_TOR',
self.report_tcp_port
if self.report_tcp_port else
self.tcp_port)
self.report_ssl_port_tor = self.integer('REPORT_SSL_PORT_TOR',
self.report_ssl_port
if self.report_ssl_port else
self.ssl_port)
self.report_host_tor = self.default('REPORT_HOST_TOR', '')
self.identity = NetIdentity(
self.default('REPORT_HOST', self.host),
self.integer('REPORT_TCP_PORT', self.tcp_port) or None,
self.integer('REPORT_SSL_PORT', self.ssl_port) or None,
''
)
self.tor_identity = NetIdentity(
self.default('REPORT_HOST_TOR', ''), # must be a string
self.integer('REPORT_TCP_PORT_TOR',
self.identity.tcp_port
if self.identity.tcp_port else
self.tcp_port) or None,
self.integer('REPORT_SSL_PORT_TOR',
self.identity.ssl_port
if self.identity.ssl_port else
self.ssl_port) or None,
'_tor'
)
def default(self, envvar, default):
return environ.get(envvar, default)

75
server/peers.py

@ -8,7 +8,6 @@
'''Peer management.'''
import asyncio
import itertools
import socket
from collections import namedtuple
@ -16,7 +15,6 @@ import lib.util as util
from server.irc import IRC
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
IRCPeer = namedtuple('IRCPeer', 'ip_addr host details')
@ -26,8 +24,7 @@ class PeerManager(util.LoggedClass):
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
'''
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
PROTOCOL_VERSION = '1.0'
def __init__(self, env, controller):
super().__init__()
@ -38,59 +35,44 @@ class PeerManager(util.LoggedClass):
self._identities = []
# Keyed by nick
self.irc_peers = {}
self.updated_nicks = set()
# We can have a Tor identity inaddition to a normal one
self._identities.append(self.identity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
if env.report_host_tor.endswith('.onion'):
self._identities.append(self.identity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
@classmethod
def identity(self, host, tcp_port, ssl_port, suffix):
'''Returns a NetIdentity object. Unpublished ports are None.'''
return NetIdentity(host, tcp_port or None, ssl_port or None, suffix)
@classmethod
def real_name(cls, identity):
self._identities.append(env.identity)
if env.tor_identity.host.endswith('.onion'):
self._identities.append(env.tor_identity)
def real_name(self, host, protocol_version, tcp_port, ssl_port):
'''Real name as used on IRC.'''
default_ports = self.env.coin.PEER_DEFAULT_PORTS
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
if port == default_ports.get(letter):
return letter
else:
return ' ' + letter + str(port)
return letter + str(port)
parts = [host, 'v' + protocol_version]
for letter, port in (('s', ssl_port), ('t', tcp_port)):
if port:
parts.append(port_text(letter, port))
return ' '.join(parts)
tcp = port_text('t', identity.tcp_port)
ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def irc_name_pairs(self):
return [(self.real_name(identity.host, self.PROTOCOL_VERSION,
identity.tcp_port, identity.ssl_port),
identity.nick_suffix)
for identity in self._identities]
def identities(self):
'''Return a list of network identities of this server.'''
return self._identities
async def refresh_peer_subs(self):
for n in itertools.count():
await asyncio.sleep(60)
updates = [self.irc_peers[nick] for nick in self.updated_nicks
if nick in self.irc_peers]
if updates:
self.controller.notify_peers(updates)
self.updated_nicks.clear()
def ensure_future(self, coro, callback=None):
'''Schedule the coro to be run.'''
return self.controller.ensure_future(coro, callback=callback)
async def main_loop(self):
'''Not a loop for now...'''
self.controller.ensure_future(self.refresh_peer_subs())
if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self._identities]
self.controller.ensure_future(self.irc.start(name_pairs))
self.ensure_future(self.irc.start(self.irc_name_pairs()))
else:
self.logger.info('IRC is disabled')
@ -102,7 +84,6 @@ class PeerManager(util.LoggedClass):
except socket.error:
pass # IPv6?
ip_addr = ip_addr or hostname
self.updated_nicks.add(nick)
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details))
@ -122,9 +103,9 @@ class PeerManager(util.LoggedClass):
def count(self):
return len(self.irc_peers)
def peer_dict(self):
def rpc_data(self):
return self.irc_peers
def peer_list(self):
def on_peers_subscribe(self):
'''Returns the server peers as a list of (ip, host, details) tuples.'''
return list(self.irc_peers.values())

44
server/session.py

@ -8,7 +8,6 @@
'''Classes for local RPC server and remote client TCP/SSL servers.'''
import time
import traceback
from functools import partial
from lib.jsonrpc import JSONSession, RPCError
@ -52,16 +51,8 @@ class SessionBase(JSONSession):
super().close_connection()
def peername(self, *, for_log=True):
'''Return the peer name of this connection.'''
peer_info = self.peer_info()
if not peer_info:
return 'unknown'
if for_log and self.anon_logs:
return 'xx.xx.xx.xx:xx'
if ':' in peer_info[0]:
return '[{}]:{}'.format(peer_info[0], peer_info[1])
else:
return '{}:{}'.format(peer_info[0], peer_info[1])
'''Return the peer address and port.'''
return self.peer_addr(anon=for_log and self.anon_logs)
def flags(self):
'''Status flags.'''
@ -104,7 +95,6 @@ class ElectrumX(SessionBase):
super().__init__(*args, **kwargs)
self.subscribe_headers = False
self.subscribe_height = False
self.subscribe_peers = False
self.notified_height = None
self.max_send = self.env.max_send
self.max_subs = self.env.max_session_subs
@ -169,22 +159,9 @@ class ElectrumX(SessionBase):
self.subscribe_height = True
return self.height()
def peers_subscribe(self, incremental=False):
'''Returns the server peers as a list of (ip, host, details) tuples.
If incremental is False there is no subscription. If True the
remote side will receive notifications of new or modified
peers (peers that disappeared are not notified).
'''
self.subscribe_peers = incremental
return self.controller.peers.peer_list()
def notify_peers(self, updates):
'''Notify of peer updates. Updates are sent as a list in the same
format as the subscription reply, as the first parameter.
'''
if self.subscribe_peers:
self.send_notification('server.peers.subscribe', [updates])
def peers_subscribe(self):
'''Return the server peers as a list of (ip, host, details) tuples.'''
return self.controller.peer_mgr.on_peers_subscribe()
async def address_subscribe(self, address):
'''Subscribe to an address.
@ -201,16 +178,17 @@ class ElectrumX(SessionBase):
def server_features(self):
'''Returns a dictionary of server features.'''
peers = self.controller.peers
peer_mgr = self.controller.peer_mgr
hosts = {identity.host: {
'tcp_port': identity.tcp_port,
'ssl_port': identity.ssl_port,
'pruning': peers.pruning,
'version': peers.VERSION,
} for identity in self.controller.peers.identities()}
} for identity in peer_mgr.identities()}
return {
'hosts': hosts,
'pruning': peer_mgr.pruning,
'protocol_version': peer_mgr.PROTOCOL_VERSION,
'server_version': VERSION,
}
def server_version(self, client_name=None, protocol_version=None):
@ -220,7 +198,7 @@ class ElectrumX(SessionBase):
protocol_version: the protocol version spoken by the client
'''
if client_name:
self.client = str(client_name)[:15]
self.client = str(client_name)[:17]
if protocol_version is not None:
self.protocol_version = protocol_version
return VERSION

Loading…
Cancel
Save