From 151da40d5b6be87bb95b7c8641262138f0333163 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 27 Jan 2017 08:22:37 +0900 Subject: [PATCH] Implement peer discovery protocol Closes #104 DEFAULT_PORTS now a coin property A Peer object maintains peer information Revamp LocalRPC "peers" call to show a lot more information Have lib/jsonrpc.py take care of handling request timeouts Save and restore peers to a file Loosen JSON RPC rules so we work with electrum-server and beancurd which don't follow the spec. Handle incoming server.add_peer requests Send server.add_peer registrations if peer doesn't have us or correct ports Verify peers at regular intervals, forget stale peers, verify new peers or those with updated ports If connecting via one port fails, try the other Add socks.py for SOCKS4 and SOCKS5 proxying, so Tor servers can now be reached by TCP and SSL Put full licence boilerplate in lib/ files Disable IRC advertising on testnet Serve a Tor banner file if it seems like a connection came from your tor proxy (see ENVIONMENT.rst) Retry tor proxy hourly, and peers that are about to turn stale Report more onion peers to a connection that seems to be combing from your tor proxy Only report good peers to server.peers.subscribe; always report self if valid Handle peers on the wrong network robustly Default to 127.0.0.1 rather than localhost for Python <= 3.5.2 compatibility Put peer name in logs of connections to it Update docs --- LICENCE | 2 +- README.rst | 2 +- docs/ENVIRONMENT.rst | 37 +++ docs/PEER_DISCOVERY.rst | 4 +- docs/PROTOCOL.rst | 4 +- docs/RPC-INTERFACE.rst | 29 ++- electrumx_rpc.py | 9 +- lib/coins.py | 43 +++- lib/hash.py | 22 +- lib/jsonrpc.py | 147 ++++++++--- lib/peer.py | 294 ++++++++++++++++++++++ lib/script.py | 21 +- lib/socks.py | 181 +++++++++++++ lib/tx.py | 21 +- lib/util.py | 42 +++- server/controller.py | 100 ++++---- server/daemon.py | 1 + server/db.py | 1 + server/env.py | 48 ++-- server/irc.py | 12 +- server/peers.py | 544 +++++++++++++++++++++++++++++++++++----- server/session.py | 78 ++++-- server/version.py | 4 +- 23 files changed, 1415 insertions(+), 231 deletions(-) create mode 100644 lib/peer.py create mode 100644 lib/socks.py diff --git a/LICENCE b/LICENCE index db7d8c9..eac7aaf 100644 --- a/LICENCE +++ b/LICENCE @@ -1,4 +1,4 @@ -Copyright (c) 2016, Neil Booth +Copyright (c) 2016-2017, Neil Booth All rights reserved. diff --git a/README.rst b/README.rst index f981cf2..462986c 100644 --- a/README.rst +++ b/README.rst @@ -8,7 +8,7 @@ ElectrumX - Reimplementation of electrum-server =============================================== :Licence: MIT - :Language: Python (>= 3.5) + :Language: Python (>= 3.5.1) :Author: Neil Booth Getting Started diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 07c5204..e799b2a 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -122,6 +122,11 @@ These environment variables are optional: + **$DONATION_ADDRESS** is replaced with the address from the **DONATION_ADDRESS** environment variable. +* **TOR_BANNER_FILE** + + As for **BANNER_FILE** (which is also the default) but shown to + incoming connections believed to be to your Tor hidden service. + * **ANON_LOGS** Set to anything non-empty to replace IP addresses in logs with @@ -207,6 +212,33 @@ raise them. functioning Electrum clients by default will send pings roughly every 60 seconds. +TOR +--- + +In response to the `server.peers.subscribe` RPC call, ElectrumX will +only return peer servers that is has recently connected to and +verified basic functionality. + +If you are not running a Tor proxy ElectrumX will be unable to connect +to onion server peers, in which case rather than returning no onion +peers it will fall back to a hard-coded list. + +To give incoming clients a full range of onion servers you will need +to be running a Tor proxy for ElectrumX to use. + +* **TOR_PROXY_HOST** + + The host where the Tor proxy is running. Defaults to *127.0.0.1*. + If you use a hostname here rather than an IP address, you must have + Python version >= 3.5.3, Python 3.5.2 will **not** work. + +* **TOR_PROXY_PORT** + + The port on which the Tor proxy is running. If not set, ElectrumX + will autodetect any proxy running on the usual ports 9050 (Tor), + 9150 (Tor browser bundle) and 1080 (socks). + + IRC --- @@ -255,6 +287,11 @@ connectivity on IRC: unless it is '0', otherwise **SSL_PORT**. '0' disables publishing the port. + **NOTE**: Certificate-Authority signed certificates don't work over + Tor, so you should set **REPORT_SSL_PORT_TOR** to 0 if yours is not + self-signed. + + Cache ----- diff --git a/docs/PEER_DISCOVERY.rst b/docs/PEER_DISCOVERY.rst index 1187841..a69b9c3 100644 --- a/docs/PEER_DISCOVERY.rst +++ b/docs/PEER_DISCOVERY.rst @@ -160,8 +160,8 @@ Unknown keys should be silently ignored. * **protocol_min** Strings that are the minimum and maximum Electrum protcol versions - this server speaks. Should be the same as what would suffix the - letter **v** in the IRC real name. Example: "1.1". + this server speaks. The maximum value should be the same as what + would suffix the letter **v** in the IRC real name. Example: "1.1". * **pruning** diff --git a/docs/PROTOCOL.rst b/docs/PROTOCOL.rst index e5e372d..5f3151e 100644 --- a/docs/PROTOCOL.rst +++ b/docs/PROTOCOL.rst @@ -723,8 +723,8 @@ Get a list of features and services supported by the server. * **pruning** - The history pruning limit of the server. If the server does not - prune return *null*. + The history pruning limit of the server as an integer. If the + server does not prune return *null*. **Example Response** diff --git a/docs/RPC-INTERFACE.rst b/docs/RPC-INTERFACE.rst index 4816838..bb0aed9 100644 --- a/docs/RPC-INTERFACE.rst +++ b/docs/RPC-INTERFACE.rst @@ -62,10 +62,10 @@ The following commands are available: Sessions are put into groups, primarily as an anti-DoS measure. Initially all connections made within a period of time are put in the same group. High bandwidth usage by a member of a group - deprioritizes itself, and all members of its group to a lesser + deprioritizes that session, and all members of its group to a lesser extent. Low-priority sessions have their requests served after higher priority sessions. ElectrumX will start delaying responses - to a sessions if it becomes sufficiently deprioritized. + to a session if it becomes sufficiently deprioritized. * **sessions** @@ -125,7 +125,7 @@ The following commands are available: ElectrumX initiates the socket close process for the passed sessions. Whilst most connections close quickly, it can take - several minutes for Python to close some SSL connections down. + several minutes for Python to shut some SSL connections down. * **log** @@ -153,22 +153,25 @@ The following commands are available: Returns a list of peer electrum servers. This command takes no arguments. - Currently this is data gleaned from an IRC session. + Currently peer data is obtained via a peer discovery protocol; it + used to be taken from IRC. * **daemon_url** - This command takes an option argument that is interpreted - identically to the **DAEMON_URL** environment variable. If default - value of the argument is the **DAEMON_URL** environment variable. + This command takes an optional argument that is interpreted + identically to the **DAEMON_URL** environment variable. If omitted, + the default argument value is the process's **DAEMON_URL** + environment variable. - The command replaces the daemon's URL at run-time, and rotates to the - first in the list. + This command replaces the daemon's URL at run-time, and also + forecefully rotates to the first URL in the list. - For example, in case ElectrumX has rotated to a secondary daemon and - you want to revert to the first after fixing the issue, call this - command without an argument. + For example, in case ElectrumX has previously failed over to a + secondary daemon and you want to revert to the primary having + resolved the connectivity issue, invoking this command without an + argument will have that effect. * **reorg** Force a block chain reorg. This command takes an optional - argument - the number of blocks to reorg - that defaults to 3. + argument - the number of blocks to reorg - which defaults to 3. diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 122a7df..8666df6 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -40,12 +40,9 @@ class RPCClient(JSONSession): self.send_request(handler, method, params) def handle_response(self, method, result, error): - if method in ('groups', 'sessions') and not error: - if method == 'groups': - lines = Controller.groups_text_lines(result) - else: - lines = Controller.sessions_text_lines(result) - for line in lines: + if method in ('groups', 'peers', 'sessions') and not error: + lines_func = getattr(Controller, '{}_text_lines'.format(method)) + for line in lines_func(result): print(line) elif error: print('error: {} (code {:d})' diff --git a/lib/coins.py b/lib/coins.py index 11e4f04..aaac853 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -2,8 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. '''Module providing coin abstraction. @@ -37,6 +55,7 @@ class Coin(object): RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?') VALUE_PER_COIN = 100000000 CHUNK_SIZE = 2016 + IRC_PREFIX = None IRC_SERVER = "irc.freenode.net" IRC_PORT = 6667 HASHX_LEN = 11 @@ -50,7 +69,7 @@ class Coin(object): Raise an exception if unrecognised.''' req_attrs = ('TX_COUNT', 'TX_COUNT_HEIGHT', 'TX_PER_BLOCK', - 'IRC_CHANNEL', 'IRC_PREFIX') + 'IRC_CHANNEL') for coin in util.subclasses(Coin): if (coin.NAME.lower() == name.lower() and coin.NET.lower() == net.lower()): @@ -285,25 +304,28 @@ class Bitcoin(Coin): IRC_CHANNEL = "#electrum" RPC_PORT = 8332 PEERS = [ - '4cii7ryno5j3axe4.onion t' 'btc.smsys.me s995', 'ca6ulp2j2mpsft3y.onion s t', 'electrum.be s t', - 'electrum.trouth.net s t', + 'electrum.trouth.net p10000 s t', 'electrum.vom-stausee.de s t', - 'electrum3.hachre.de s t', + 'electrum3.hachre.de p10000 s t', 'electrum.hsmiths.com s t', 'erbium1.sytes.net s t', - 'h.1209k.com s t', + 'fdkbwjykvl2f3hup.onion p10000 s t', + 'h.1209k.com p10000 s t', 'helicarrier.bauerj.eu s t', + 'hsmiths4fyqlw5xw.onion s t', 'ozahtqwp25chjdjd.onion s t', 'us11.einfachmalnettsein.de s t', + 'ELEX01.blackpole.online s t', ] class BitcoinTestnet(Bitcoin): SHORTNAME = "XTN" NET = "testnet" + IRC_PREFIX = None XPUB_VERBYTES = bytes.fromhex("043587cf") XPRV_VERBYTES = bytes.fromhex("04358394") P2PKH_VERBYTE = 0x6f @@ -315,16 +337,15 @@ class BitcoinTestnet(Bitcoin): TX_COUNT = 12242438 TX_COUNT_HEIGHT = 1035428 TX_PER_BLOCK = 21 - IRC_PREFIX = "ET_" RPC_PORT = 18332 PEER_DEFAULT_PORTS = {'t': '51001', 's': '51002'} PEERS = [ 'electrum.akinbo.org s t', 'he36kyperp3kbuxu.onion s t', 'electrum-btc-testnet.petrkr.net s t', - 'testnet.hsmiths.com t53011 s53012', - 'hsmithsxurybd7uh.onion t53011', - 'testnet.not.fyi s t', + 'testnet.hsmiths.com t53011', + 'hsmithsxurybd7uh.onion t53011 s53012', + 'ELEX05.blackpole.online t52001 s52002', ] diff --git a/lib/hash.py b/lib/hash.py index 7ab28af..4aba0fe 100644 --- a/lib/hash.py +++ b/lib/hash.py @@ -2,8 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. '''Cryptograph hash functions and related classes.''' diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 13f1895..e0d90ae 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -2,8 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. '''Classes for acting as a peer over a transport and speaking the JSON RPC versions 1.0 and 2.0. @@ -41,9 +59,35 @@ class JSONRPC(object): INVALID_ARGS = -32602 INTERNAL_ERROR = -32603 + # Codes for this library + INVALID_RESPONSE = -100 + ERROR_CODE_UNAVAILABLE = -101 + REQUEST_TIMEOUT = -102 + ID_TYPES = (type(None), str, numbers.Number) HAS_BATCHES = False + @classmethod + def canonical_error(cls, error): + '''Convert an error to a JSON RPC 2.0 error. + + Handlers then only have a single form of error to deal with. + ''' + if isinstance(error, int): + error = {'code': error} + elif isinstance(error, str): + error = {'message': error} + elif not isinstance(error, dict): + error = {'data': error} + error['code'] = error.get('code', JSONRPC.ERROR_CODE_UNAVAILABLE) + error['message'] = error.get('message', 'error message unavailable') + return error + + @classmethod + def timeout_error(cls): + return {'message': 'request timed out', + 'code': JSONRPC.REQUEST_TIMEOUT} + class JSONRPCv1(JSONRPC): '''JSON RPC version 1.0.''' @@ -71,23 +115,23 @@ class JSONRPCv1(JSONRPC): @classmethod def handle_response(cls, handler, payload): - '''JSON v1 response handler. Both 'error' and 'response' + '''JSON v1 response handler. Both 'error' and 'result' should exist with exactly one being None. + + Unfortunately many 1.0 clients behave like 2.0, and just send + one or the other. ''' - try: - result = payload['result'] - error = payload['error'] - except KeyError: - pass + error = payload.get('error') + if error is None: + handler(payload.get('result'), None) else: - if (result is None) != (error is None): - handler(result, error) + handler(None, cls.canonical_error(error)) @classmethod def is_request(cls, payload): '''Returns True if the payload (which has a method) is a request. False means it is a notification.''' - return payload.get('id') != None + return payload.get('id') is not None class JSONRPCv2(JSONRPC): @@ -124,17 +168,17 @@ class JSONRPCv2(JSONRPC): @classmethod def handle_response(cls, handler, payload): - '''JSON v2 response handler. Exactly one of 'error' and 'response' + '''JSON v2 response handler. Exactly one of 'error' and 'result' must exist. Errors must have 'code' and 'message' members. ''' - if ('error' in payload) != ('result' in payload): - if 'result' in payload: - handler(payload['result'], None) - else: - error = payload['error'] - if (isinstance(error, dict) and 'code' in error - and 'message' in error): - handler(None, error) + if 'error' in payload: + handler(None, cls.canonical_error(payload['error'])) + elif 'result' in payload: + handler(payload['result'], None) + else: + error = {'message': 'no error or result returned', + 'code': JSONRPC.INVALID_RESPONSE} + handler(None, cls.canonical_error(error)) @classmethod def batch_size(cls, parts): @@ -213,15 +257,49 @@ class JSONSessionBase(util.LoggedClass): responses. Incoming messages are queued. When the queue goes from empty ''' - - NEXT_SESSION_ID = 0 + _next_session_id = 0 + _pending_reqs = {} @classmethod def next_session_id(cls): - session_id = cls.NEXT_SESSION_ID - cls.NEXT_SESSION_ID += 1 + '''Return the next unique session ID.''' + session_id = cls._next_session_id + cls._next_session_id += 1 return session_id + def _pending_request_keys(self): + '''Return a generator of pending request keys for this session.''' + return [key for key in self._pending_reqs if key[0] is self] + + def has_pending_requests(self): + '''Return True if this session has pending requests.''' + return bool(self._pending_request_keys()) + + def pop_response_handler(self, msg_id): + '''Return the response handler for the given message ID.''' + return self._pending_reqs.pop((self, msg_id), (None, None))[0] + + def timeout_session(self): + '''Trigger timeouts for all of the session's pending requests.''' + self._timeout_requests(self._pending_request_keys()) + + @classmethod + def timeout_check(cls): + '''Trigger timeouts where necessary for all pending requests.''' + now = time.time() + keys = [key for key, value in cls._pending_reqs.items() + if value[1] < now] + cls._timeout_requests(keys) + + @classmethod + def _timeout_requests(cls, keys): + '''Trigger timeouts for the given lookup keys.''' + values = [cls._pending_reqs.pop(key) for key in keys] + handlers = [handler for handler, timeout in values] + timeout_error = JSONRPC.timeout_error() + for handler in handlers: + handler(None, timeout_error) + def __init__(self, version=JSONRPCCompat): super().__init__() @@ -245,7 +323,6 @@ class JSONSessionBase(util.LoggedClass): self.batch_results = [] # Handling of outgoing requests self.next_request_id = 0 - self.pending_responses = {} # If buffered incoming data exceeds this the connection is closed self.max_buffer_size = 1000000 self.max_send = 50000 @@ -296,7 +373,7 @@ class JSONSessionBase(util.LoggedClass): '''Extract and return the ID from the payload. Raises an RPCError if it is missing or invalid.''' - if not 'id' in payload: + if 'id' not in payload: raise RPCError('missing id', JSONRPC.INVALID_REQUEST) id_ = payload['id'] @@ -473,7 +550,7 @@ class JSONSessionBase(util.LoggedClass): '''Handle a single JSON response.''' try: id_ = self.check_payload_id(payload) - handler = self.pending_responses.pop(id_, None) + handler = self.pop_response_handler(id_) if handler: self.version.handle_response(handler, payload) else: @@ -593,14 +670,18 @@ class JSONSessionBase(util.LoggedClass): '''Send a JSON error.''' self.send_binary(self.error_bytes(message, code, id_)) - def send_request(self, handler, method, params=None): + def send_request(self, handler, method, params=None, timeout=30): '''Sends a request and arranges for handler to be called with the response when it comes in. + + A call to request_timeout_check() will result in pending + responses that have been waiting more than timeout seconds to + call the handler with a REQUEST_TIMEOUT error. ''' id_ = self.next_request_id self.next_request_id += 1 self.send_binary(self.request_bytes(id_, method, params)) - self.pending_responses[id_] = handler + self._pending_reqs[(self, id_)] = (handler, time.time() + timeout) def send_notification(self, method, params=None): '''Send a notification.''' @@ -679,7 +760,9 @@ class JSONSession(JSONSessionBase, asyncio.Protocol): def peer_info(self): '''Returns information about the peer.''' - return self.transport.get_extra_info('peername') + if self.transport: + return self.transport.get_extra_info('peername') + return None def abort(self): '''Cut the connection abruptly.''' @@ -691,6 +774,10 @@ class JSONSession(JSONSessionBase, asyncio.Protocol): self.transport = transport super().connection_made() + def connection_lost(self, exc): + '''Trigger timeouts of all pending requests.''' + self.timeout_session() + def is_closing(self): '''True if the underlying transport is closing.''' return self.transport and self.transport.is_closing() diff --git a/lib/peer.py b/lib/peer.py new file mode 100644 index 0000000..a5ff3df --- /dev/null +++ b/lib/peer.py @@ -0,0 +1,294 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +'''Representation of a peer server.''' + +import re +from ipaddress import ip_address + +from lib.util import cachedproperty + + +class Peer(object): + + # Protocol version + VERSION_REGEX = re.compile('[0-9]+(\.[0-9]+)?$') + ATTRS = ('host', 'features', + # metadata + 'source', 'ip_addr', 'good_ports', + 'last_connect', 'last_try', 'try_count') + PORTS = ('ssl_port', 'tcp_port') + FEATURES = PORTS + ('pruning', 'server_version', + 'protocol_min', 'protocol_max') + # This should be set by the application + DEFAULT_PORTS = {} + + def __init__(self, host, features, source='unknown', ip_addr=None, + good_ports=[], last_connect=0, last_try=0, try_count=0): + '''Create a peer given a host name (or IP address as a string), + a dictionary of features, and a record of the source.''' + assert isinstance(host, str) + assert isinstance(features, dict) + self.host = host + self.features = features.copy() + # Canonicalize / clean-up + for feature in self.FEATURES: + self.features[feature] = getattr(self, feature) + # Metadata + self.source = source + self.ip_addr = ip_addr + self.good_ports = good_ports.copy() + self.last_connect = last_connect + self.last_try = last_try + self.try_count = try_count + # Transient, non-persisted metadata + self.bad = False + self.other_port_pairs = set() + + @classmethod + def peers_from_features(cls, features, source): + peers = [] + if isinstance(features, dict): + hosts = features.get('hosts') + if isinstance(hosts, dict): + peers = [Peer(host, features, source=source) + for host in hosts if isinstance(host, str)] + return peers + + @classmethod + def deserialize(cls, item): + '''Deserialize from a dictionary.''' + return cls(**item) + + @classmethod + def version_tuple(cls, vstr): + '''Convert a version string, such as "1.2", to a (major_version, + minor_version) pair. + ''' + if isinstance(vstr, str) and VERSION_REGEX.match(vstr): + if not '.' in vstr: + vstr += '.0' + else: + vstr = '1.0' + return tuple(int(part) for part in vstr.split('.')) + + def matches(self, peers): + '''Return peers whose host matches the given peer's host or IP + address. This results in our favouring host names over IP + addresses. + ''' + candidates = (self.host.lower(), self.ip_addr) + return [peer for peer in peers if peer.host.lower() in candidates] + + def __str__(self): + return self.host + + def update_features(self, features): + '''Update features in-place.''' + tmp = Peer(self.host, features) + self.features = tmp.features + for feature in self.FEATURES: + setattr(self, feature, getattr(tmp, feature)) + + def connection_port_pairs(self): + '''Return a list of (kind, port) pairs to try when making a + connection.''' + # Use a list not a set - it's important to try the registered + # ports first. + pairs = [('SSL', self.ssl_port), ('TCP', self.tcp_port)] + while self.other_port_pairs: + pairs.append(other_port_pairs.pop()) + return [pair for pair in pairs if pair[1]] + + def mark_bad(self): + '''Mark as bad to avoid reconnects but also to remember for a + while.''' + self.bad = True + + def check_ports(self, other): + '''Remember differing ports in case server operator changed them + or removed one.''' + if other.ssl_port != self.ssl_port: + self.other_port_pairs.add(('SSL', other.ssl_port)) + if other.tcp_port != self.tcp_port: + self.other_port_pairs.add(('TCP', other.tcp_port)) + return bool(self.other_port_pairs) + + @cachedproperty + def is_tor(self): + return self.host.endswith('.onion') + + @cachedproperty + def is_valid(self): + ip = self.ip_address + if not ip: + return True + return not ip.is_multicast and (ip.is_global or ip.is_private) + + @cachedproperty + def is_public(self): + ip = self.ip_address + return self.is_valid and not (ip and ip.is_private) + + @cachedproperty + def ip_address(self): + '''The host as a python ip_address object, or None.''' + try: + return ip_address(self.host) + except ValueError: + return None + + def bucket(self): + if self.is_tor: + return 'onion' + if not self.ip_addr: + return '' + return tuple(self.ip_addr.split('.')[:2]) + + def serialize(self): + '''Serialize to a dictionary.''' + return {attr: getattr(self, attr) for attr in self.ATTRS} + + def _port(self, key): + hosts = self.features.get('hosts') + if isinstance(hosts, dict): + host = hosts.get(self.host) + port = self._integer(key, host) + if port and 0 < port < 65536: + return port + return None + + def _integer(self, key, d=None): + d = d or self.features + result = d.get(key) if isinstance(d, dict) else None + if isinstance(result, str): + try: + result = int(result) + except ValueError: + pass + return result if isinstance(result, int) else None + + def _string(self, key): + result = self.features.get(key) + return result if isinstance(result, str) else None + + def _version_string(self, key): + version = self.features.get(key) + return '{:d}.{:d}'.format(*self.version_tuple(version)) + + @cachedproperty + def genesis_hash(self): + '''Returns None if no SSL port, otherwise the port as an integer.''' + return self._string('genesis_hash') + + @cachedproperty + def ssl_port(self): + '''Returns None if no SSL port, otherwise the port as an integer.''' + return self._port('ssl_port') + + @cachedproperty + def tcp_port(self): + '''Returns None if no TCP port, otherwise the port as an integer.''' + return self._port('tcp_port') + + @cachedproperty + def server_version(self): + '''Returns the server version as a string if known, otherwise None.''' + return self._string('server_version') + + @cachedproperty + def pruning(self): + '''Returns the pruning level as an integer. None indicates no + pruning.''' + pruning = self._integer('pruning') + if pruning and pruning > 0: + return pruning + return None + + @cachedproperty + def protocol_min(self): + '''Minimum protocol version as a string, e.g., 1.0''' + return self._version_string('protcol_min') + + @cachedproperty + def protocol_max(self): + '''Maximum protocol version as a string, e.g., 1.1''' + return self._version_string('protcol_max') + + def to_tuple(self): + '''The tuple ((ip, host, details) expected in response + to a peers subscription.''' + details = self.real_name().split()[1:] + return (self.ip_addr or self.host, self.host, details) + + def real_name(self, host_override=None): + '''Real name of this peer as used on IRC.''' + def port_text(letter, port): + if port == self.DEFAULT_PORTS.get(letter): + return letter + else: + return letter + str(port) + + parts = [host_override or self.host, 'v' + self.protocol_max] + if self.pruning: + parts.append('p{:d}'.format(self.pruning)) + for letter, port in (('s', self.ssl_port), ('t', self.tcp_port)): + if port: + parts.append(port_text(letter, port)) + return ' '.join(parts) + + @classmethod + def from_real_name(cls, real_name, source): + '''Real name is a real name as on IRC, such as + + "erbium1.sytes.net v1.0 s t" + + Returns an instance of this Peer class. + ''' + host = 'nohost' + features = {} + ports = {} + for n, part in enumerate(real_name.split()): + if n == 0: + host = part + continue + if part[0] in ('s', 't'): + if len(part) == 1: + port = cls.DEFAULT_PORTS[part[0]] + else: + port = part[1:] + if part[0] == 's': + ports['ssl_port'] = port + else: + ports['tcp_port'] = port + elif part[0] == 'v': + features['protocol_max'] = features['protocol_min'] = part[1:] + elif part[0] == 'p': + features['pruning'] = part[1:] + + features.update(ports) + features['hosts'] = {host: ports} + + return cls(host, features, source) diff --git a/lib/script.py b/lib/script.py index fbb2337..2c35184 100644 --- a/lib/script.py +++ b/lib/script.py @@ -2,7 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # and warranty status of this software. '''Script-related classes and functions.''' diff --git a/lib/socks.py b/lib/socks.py new file mode 100644 index 0000000..86fddb6 --- /dev/null +++ b/lib/socks.py @@ -0,0 +1,181 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# and warranty status of this software. + +'''Socks proxying.''' + +import asyncio +import ipaddress +import logging +import socket +import struct +from functools import partial + +import lib.util as util + + +class Socks(util.LoggedClass): + '''Socks protocol wrapper.''' + + SOCKS5_ERRORS = { + 1: 'general SOCKS server failure', + 2: 'connection not allowed by ruleset', + 3: 'network unreachable', + 4: 'host unreachable', + 5: 'connection refused', + 6: 'TTL expired', + 7: 'command not supported', + 8: 'address type not supported', + } + + class Error(Exception): + pass + + def __init__(self, loop, sock, host, port): + super().__init__() + self.loop = loop + self.sock = sock + self.host = host + self.port = port + try: + self.ip_address = ipaddress.ip_address(host) + except ValueError: + self.ip_address = None + self.debug = False + + async def _socks4_handshake(self): + if self.ip_address: + # Socks 4 + ip_addr = self.ip_address + host_bytes = b'' + else: + # Socks 4a + ip_addr = ipaddress.ip_address('0.0.0.1') + host_bytes = self.host.encode() + b'\0' + + user_id = '' + data = b'\4\1' + struct.pack('>H', self.port) + ip_addr.packed + data += user_id.encode() + b'\0' + host_bytes + await self.loop.sock_sendall(self.sock, data) + data = await self.loop.sock_recv(self.sock, 8) + if data[0] != 0: + raise self.Error('proxy sent bad initial Socks4 byte') + if data[1] != 0x5a: + raise self.Error('proxy request failed or rejected') + + async def _socks5_handshake(self): + await self.loop.sock_sendall(self.sock, b'\5\1\0') + data = await self.loop.sock_recv(self.sock, 2) + if data[0] != 5: + raise self.Error('proxy sent bad SOCKS5 initial byte') + if data[1] != 0: + raise self.Error('proxy rejected SOCKS5 authentication method') + + if self.ip_address: + if self.ip_address.version == 4: + addr = b'\1' + self.ip_address.packed + else: + addr = b'\4' + self.ip_address.packed + else: + host = self.host.encode() + addr = b'\3' + bytes([len(host)]) + host + + data = b'\5\1\0' + addr + struct.pack('>H', self.port) + await self.loop.sock_sendall(self.sock, data) + data = await self.loop.sock_recv(self.sock, 5) + if data[0] != 5: + raise self.Error('proxy sent bad SOSCK5 response initial byte') + if data[1] != 0: + msg = self.SOCKS5_ERRORS.get(data[1], 'unknown SOCKS5 error {:d}' + .format(data[1])) + raise self.Error(msg) + if data[3] == 1: + addr_len, data = 3, data[4:] + elif data[3] == 3: + addr_len, data = data[4], b'' + elif data[3] == 4: + addr_len, data = 15, data[4:] + data = await self.loop.sock_recv(self.sock, addr_len + 2) + addr = data[:addr_len] + port, = struct.unpack('>H', data[-2:]) + + async def handshake(self): + '''Write the proxy handshake sequence.''' + if self.ip_address and self.ip_address.version == 6: + await self._socks5_handshake() + else: + await self._socks4_handshake() + + if self.debug: + address = (self.host, self.port) + self.log_info('successful connection via proxy to {}' + .format(util.address_string(address))) + + +class SocksProxy(util.LoggedClass): + + def __init__(self, host, port, loop=None): + '''Host can be an IPv4 address, IPv6 address, or a host name.''' + super().__init__() + self.host = host + self.port = port + self.ip_addr = None + self.loop = loop or asyncio.get_event_loop() + + async def create_connection(self, protocol_factory, host, port, ssl=None): + '''All arguments are as to asyncio's create_connection method.''' + if self.port is None: + proxy_ports = [9050, 9150, 1080] + else: + proxy_ports = [self.port] + + for proxy_port in proxy_ports: + address = (self.host, proxy_port) + sock = socket.socket() + sock.setblocking(False) + try: + await self.loop.sock_connect(sock, address) + except OSError as e: + if proxy_port == proxy_ports[-1]: + raise + continue + + socks = Socks(self.loop, sock, host, port) + try: + await socks.handshake() + if self.port is None: + self.ip_addr = sock.getpeername()[0] + self.port = proxy_port + self.logger.info('detected proxy at {} ({})' + .format(util.address_string(address), + self.ip_addr)) + break + except Exception as e: + sock.close() + raise + + hostname = host if ssl else None + return await self.loop.create_connection( + protocol_factory, ssl=ssl, sock=sock, server_hostname=hostname) diff --git a/lib/tx.py b/lib/tx.py index 31d2186..41e4583 100644 --- a/lib/tx.py +++ b/lib/tx.py @@ -2,7 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # and warranty status of this software. '''Transaction-related classes and functions.''' diff --git a/lib/util.py b/lib/util.py index e5c8674..fdd6dfe 100644 --- a/lib/util.py +++ b/lib/util.py @@ -2,20 +2,38 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # and warranty status of this software. '''Miscellaneous utility classes and functions.''' import array -import asyncio import inspect +from ipaddress import ip_address import logging import sys from collections import Container, Mapping - class LoggedClass(object): def __init__(self): @@ -202,3 +220,21 @@ def open_file(filename, create=False): if create: return open(filename, 'wb+') raise + +def open_truncate(filename): + '''Open the file name. Return its handle.''' + return open(filename, 'wb+') + + +def address_string(address): + '''Return an address as a correctly formatted string.''' + fmt = '{}:{:d}' + host, port = address + try: + host = ip_address(host) + except ValueError: + pass + else: + if host.version == 6: + fmt = '[{}]:{:d}' + return fmt.format(host, port) diff --git a/server/controller.py b/server/controller.py index db161c8..52694b2 100644 --- a/server/controller.py +++ b/server/controller.py @@ -6,7 +6,6 @@ # and warranty status of this software. import asyncio -import codecs import json import os import ssl @@ -20,7 +19,7 @@ from functools import partial import pylru -from lib.jsonrpc import JSONRPC, RPCError +from lib.jsonrpc import JSONRPC, JSONSessionBase, RPCError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor @@ -28,7 +27,6 @@ from server.daemon import Daemon, DaemonError from server.mempool import MemPool from server.peers import PeerManager from server.session import LocalRPC, ElectrumX -from server.version import VERSION class Controller(util.LoggedClass): @@ -88,8 +86,7 @@ class Controller(util.LoggedClass): 'address.get_proof address.listunspent ' 'block.get_header block.get_chunk estimatefee relayfee ' 'transaction.get transaction.get_merkle utxo.get_address'), - ('server', - 'banner donation_address'), + ('server', 'donation_address'), ] self.electrumx_handlers = {'.'.join([prefix, suffix]): getattr(self, suffix.replace('.', '_')) @@ -218,15 +215,21 @@ class Controller(util.LoggedClass): def on_future_done(self, future): '''Collect the result of a future after removing it from our set.''' callback = self.futures.pop(future) - if callback: - callback(future) - else: - try: + try: + if callback: + callback(future) + else: future.result() - except asyncio.CancelledError: - pass - except Exception: - self.log_error(traceback.format_exc()) + except asyncio.CancelledError: + pass + except Exception: + self.log_error(traceback.format_exc()) + + async def check_request_timeouts(self): + '''Regularly check pending JSON requests for timeouts.''' + while True: + await asyncio.sleep(30) + JSONSessionBase.timeout_check() async def wait_for_bp_catchup(self): '''Called when the block processor catches up.''' @@ -234,6 +237,7 @@ class Controller(util.LoggedClass): self.logger.info('block processor has caught up') self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.start_servers()) + self.ensure_future(self.check_request_timeouts()) self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.enqueue_delayed_sessions()) self.ensure_future(self.notify()) @@ -242,6 +246,8 @@ class Controller(util.LoggedClass): async def main_loop(self): '''Controller main loop.''' + if self.env.rpc_port is not None: + await self.start_server('RPC', 'localhost', self.env.rpc_port) self.ensure_future(self.bp.main_loop()) self.ensure_future(self.wait_for_bp_catchup()) @@ -269,6 +275,9 @@ class Controller(util.LoggedClass): for session in self.sessions: self.close_session(session) + # This might resolve "future never awaited" log + await asyncio.sleep(0) + # Cancel pending futures for future in self.futures: future.cancel() @@ -306,9 +315,7 @@ class Controller(util.LoggedClass): .format(kind, host, port)) async def start_servers(self): - '''Start RPC, TCP and SSL servers once caught up.''' - if self.env.rpc_port is not None: - await self.start_server('RPC', 'localhost', self.env.rpc_port) + '''Start TCP and SSL servers.''' self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' .format(self.env.session_timeout)) @@ -459,7 +466,7 @@ class Controller(util.LoggedClass): 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), - 'peers': self.peer_mgr.count(), + 'peers': self.peer_mgr.info(), 'requests': sum(s.count_pending_items() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), @@ -511,6 +518,38 @@ class Controller(util.LoggedClass): ]) return result + @staticmethod + def peers_text_lines(data): + '''A generator returning lines for a list of peers. + + data is the return value of rpc_peers().''' + def time_fmt(t): + if not t: + return 'Never' + return util.formatted_time(now - t) + + now = time.time() + fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>3} ' + '{:>3} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}') + yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min', + 'Max', 'Pruning', 'Last Conn', 'Last Try', + 'Tries', 'Source', 'IP Address') + for item in data: + features = item['features'] + yield fmt.format(item['host'][:30], + item['status'], + features['tcp_port'] or '', + features['ssl_port'] or '', + features['server_version'] or 'unknown', + features['protocol_min'], + features['protocol_max'], + features['pruning'] or '', + time_fmt(item['last_connect']), + time_fmt(item['last_try']), + item['try_count'], + item['source'][:20], + item['ip_addr'] or '') + @staticmethod def sessions_text_lines(data): '''A generator returning lines for a list of sessions. @@ -872,33 +911,6 @@ class Controller(util.LoggedClass): # Client RPC "server" command handlers - async def banner(self): - '''Return the server banner text.''' - banner = 'Welcome to Electrum!' - if self.env.banner_file: - try: - with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: - banner = f.read() - except Exception as e: - self.log_error('reading banner file {}: {}' - .format(self.env.banner_file, e)) - else: - network_info = await self.daemon_request('getnetworkinfo') - version = network_info['version'] - major, minor = divmod(version, 1000000) - minor, revision = divmod(minor, 10000) - revision //= 100 - version = '{:d}.{:d}.{:d}'.format(major, minor, revision) - for pair in [ - ('$VERSION', VERSION), - ('$DAEMON_VERSION', version), - ('$DAEMON_SUBVERSION', network_info['subversion']), - ('$DONATION_ADDRESS', self.env.donation_address), - ]: - banner = banner.replace(*pair) - - return banner - def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address diff --git a/server/daemon.py b/server/daemon.py index e5d1fda..d52e973 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -20,6 +20,7 @@ import lib.util as util class DaemonError(Exception): '''Raised when the daemon returns an error in its results.''' + class Daemon(util.LoggedClass): '''Handles connections to a daemon at the given URL.''' diff --git a/server/db.py b/server/db.py index df92f38..6a3d4ef 100644 --- a/server/db.py +++ b/server/db.py @@ -23,6 +23,7 @@ from server.version import VERSION UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + class DB(util.LoggedClass): '''Simple wrapper of the backend database for querying. diff --git a/server/env.py b/server/env.py index b86cb95..afd8b1b 100644 --- a/server/env.py +++ b/server/env.py @@ -44,8 +44,14 @@ class Env(LoggedClass): self.rpc_port = self.integer('RPC_PORT', 8000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) + self.tor_banner_file = self.default('TOR_BANNER_FILE', + self.banner_file) self.anon_logs = self.default('ANON_LOGS', False) self.log_sessions = self.integer('LOG_SESSIONS', 3600) + # Tor proxy + # Python 3.5.3 - revert back to localhost? + self.tor_proxy_host = self.default('TOR_PROXY_HOST', '127.0.0.1') + self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified self.donation_address = self.default('DONATION_ADDRESS', '') self.db_engine = self.default('DB_ENGINE', 'leveldb') @@ -60,31 +66,33 @@ class Env(LoggedClass): self.irc = self.default('IRC', False) self.irc_nick = self.default('IRC_NICK', None) - self.identity = NetIdentity( + # Identities + main_identity = NetIdentity( self.default('REPORT_HOST', self.host), self.integer('REPORT_TCP_PORT', self.tcp_port) or None, self.integer('REPORT_SSL_PORT', self.ssl_port) or None, '' ) - self.tor_identity = NetIdentity( - self.default('REPORT_HOST_TOR', ''), # must be a string - self.integer('REPORT_TCP_PORT_TOR', - self.identity.tcp_port - if self.identity.tcp_port else - self.tcp_port) or None, - self.integer('REPORT_SSL_PORT_TOR', - self.identity.ssl_port - if self.identity.ssl_port else - self.ssl_port) or None, - '_tor' - ) - - if self.irc: - if not self.identity.host.strip(): - raise self.Error('IRC host is empty') - if self.identity.tcp_port == self.identity.ssl_port: - raise self.Error('IRC TCP and SSL ports are the same') - + if not main_identity.host.strip(): + raise self.Error('IRC host is empty') + if main_identity.tcp_port == main_identity.ssl_port: + raise self.Error('IRC TCP and SSL ports are the same') + + self.identities = [main_identity] + tor_host = self.default('REPORT_HOST_TOR', '') + if tor_host.endswith('.onion'): + self.identities.append(NetIdentity( + tor_host, + self.integer('REPORT_TCP_PORT_TOR', + main_identity.tcp_port + if main_identity.tcp_port else + self.tcp_port) or None, + self.integer('REPORT_SSL_PORT_TOR', + main_identity.ssl_port + if main_identity.ssl_port else + self.ssl_port) or None, + '_tor', + )) def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/irc.py b/server/irc.py index 1df04c8..0457a3d 100644 --- a/server/irc.py +++ b/server/irc.py @@ -50,7 +50,7 @@ class IRC(LoggedClass): # Register handlers for events we're interested in reactor = irc_client.Reactor() - for event in 'welcome join quit whoreply disconnect'.split(): + for event in 'welcome join whoreply disconnect'.split(): reactor.add_global_handler(event, getattr(self, 'on_' + event)) # Note: Multiple nicks in same channel will trigger duplicate events @@ -96,12 +96,6 @@ class IRC(LoggedClass): if match: connection.who(match.group(1)) - def on_quit(self, connection, event): - '''Called when someone leaves our channel.''' - match = self.peer_regexp.match(event.source) - if match: - self.peer_mgr.remove_irc_peer(match.group(1)) - def on_whoreply(self, connection, event): '''Called when a response to our who requests arrives. @@ -111,8 +105,8 @@ class IRC(LoggedClass): nick = event.arguments[4] if nick.startswith(self.prefix): line = event.arguments[6].split() - hostname, details = line[1], line[2:] - self.peer_mgr.add_irc_peer(nick, hostname, details) + hp_string = ' '.join(line[1:]) # hostname, ports, version etc. + self.peer_mgr.add_irc_peer(nick, hp_string) class IrcClient(object): diff --git a/server/peers.py b/server/peers.py index c7fdaa2..6ddf77b 100644 --- a/server/peers.py +++ b/server/peers.py @@ -7,15 +7,184 @@ '''Peer management.''' +import ast import asyncio -import socket -from collections import namedtuple +import random +import ssl +import time +from collections import defaultdict, Counter +from functools import partial +from lib.jsonrpc import JSONSession +from lib.peer import Peer +from lib.socks import SocksProxy import lib.util as util from server.irc import IRC +import server.version as version -IRCPeer = namedtuple('IRCPeer', 'ip_addr host details') +PEERS_FILE = 'peers' +PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4) +STALE_SECS = 86400 +WAKEUP_SECS = 300 + + +def peer_from_env(env): + '''Return ourself as a peer from the environment settings.''' + main_identity = env.identities[0] + hosts = {identity.host : {'tcp_port': identity.tcp_port, + 'ssl_port': identity.ssl_port} + for identity in env.identities} + features = { + 'hosts': hosts, + 'pruning': None, + 'server_version': version.VERSION, + 'protocol_min': version.PROTOCOL_MIN, + 'protocol_max': version.PROTOCOL_MAX, + 'genesis_hash': env.coin.GENESIS_HASH, + } + + return Peer(main_identity.host, features, 'env') + + +class PeerSession(JSONSession): + '''An outgoing session to a peer.''' + + def __init__(self, peer, peer_mgr, kind): + super().__init__() + self.max_send = 0 + self.peer = peer + self.peer_mgr = peer_mgr + self.kind = kind + self.failed = False + self.log_prefix = '[{}] '.format(self.peer) + + def have_pending_items(self): + self.peer_mgr.ensure_future(self.process_pending_items()) + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + super().connection_made(transport) + self.log_prefix = '[{}] '.format(str(self.peer)[:25]) + + # Update IP address + if not self.peer.is_tor: + peer_info = self.peer_info() + if peer_info: + self.peer.ip_addr = peer_info[0] + + # Collect data + proto_ver = (version.PROTOCOL_MIN, version.PROTOCOL_MAX) + self.send_request(self.on_version, 'server.version', + [version.VERSION, proto_ver]) + self.send_request(self.on_peers_subscribe, 'server.peers.subscribe') + self.send_request(self.on_features, 'server.features') + + def connection_lost(self, exc): + '''Handle disconnection.''' + super().connection_lost(exc) + self.peer_mgr.connection_lost(self) + + def on_peers_subscribe(self, result, error): + '''Handle the response to the peers.subcribe message.''' + if error: + self.failed = True + self.log_error('server.peers.subscribe: {}'.format(error)) + else: + self.check_remote_peers(result) + self.close_if_done() + + def check_remote_peers(self, updates): + '''When a peer gives us a peer update. + + Each update is expected to be of the form: + [ip_addr, hostname, ['v1.0', 't51001', 's51002']] + + Return True if we're in the list of peers. + ''' + try: + real_names = [' '.join([u[1]] + u[2]) for u in updates] + peers = [Peer.from_real_name(real_name, str(self.peer)) + for real_name in real_names] + except Exception: + self.log_error('bad server.peers.subscribe response') + return False + + self.peer_mgr.add_peers(peers) + my = self.peer_mgr.myself + for peer in my.matches(peers): + if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port: + return + + # Announce ourself if not present + self.log_info('registering with server.add_peer') + self.send_request(self.on_add_peer, 'server.add_peer', [my.features]) + + def on_add_peer(self, result, error): + '''Handle the response to the add_peer message.''' + self.close_if_done() + + def on_features(self, features, error): + # Several peers don't implement this. If they do, check they are + # the same network with the genesis hash. + verified = False + if not error and isinstance(features, dict): + forget = False + our_hash = self.peer_mgr.env.coin.GENESIS_HASH + their_hash = features.get('genesis_hash') + if their_hash: + verified = their_hash == our_hash + forget = their_hash != our_hash + if forget: + self.failed = True + self.peer.mark_bad() + self.log_warning('incorrect genesis hash') + else: + self.peer.update_features(features) + # For legacy peers not implementing features, check their height + # as a proxy to determining they're on our network + if not verified: + self.send_request(self.on_headers, 'blockchain.headers.subscribe') + self.close_if_done() + + def on_headers(self, result, error): + '''Handle the response to the version message.''' + if error or not isinstance(result, dict): + self.failed = True + self.log_error('bad blockchain.headers.subscribe response') + else: + our_height = self.peer_mgr.controller.bp.db_height + their_height = result.get('block_height') + if (not isinstance(their_height, int) or + abs(our_height - their_height) > 5): + self.failed = True + self.peer.mark_bad() + self.log_warning('bad height {}'.format(their_height)) + self.close_if_done() + + def on_version(self, result, error): + '''Handle the response to the version message.''' + if error: + self.failed = True + self.log_error('server.version returned an error') + elif isinstance(result, str): + self.peer.server_version = result + self.peer.features['server_version'] = result + self.close_if_done() + + def close_if_done(self): + if not self.has_pending_requests(): + is_good = not self.failed + self.peer.last_connect = time.time() + self.peer_mgr.set_connection_status(self.peer, is_good) + if is_good: + if self.peer.is_tor: + self.log_info('verified via {} over Tor'.format(self.kind)) + else: + self.log_info('verified via {} at {}' + .format(self.kind, + self.peer_addr(anon=False))) + self.close_connection() class PeerManager(util.LoggedClass): @@ -24,88 +193,325 @@ class PeerManager(util.LoggedClass): Attempts to maintain a connection with up to 8 peers. Issues a 'peers.subscribe' RPC to them and tells them our data. ''' - PROTOCOL_VERSION = '1.0' - def __init__(self, env, controller): super().__init__() + # Initialise the Peer class + Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS self.env = env self.controller = controller + self.loop = controller.loop self.irc = IRC(env, self) - self.pruning = None - self._identities = [] - # Keyed by nick - self.irc_peers = {} - self._identities.append(env.identity) - if env.tor_identity.host.endswith('.onion'): - self._identities.append(env.tor_identity) - - def real_name(self, host, protocol_version, tcp_port, ssl_port): - '''Real name as used on IRC.''' - default_ports = self.env.coin.PEER_DEFAULT_PORTS - - def port_text(letter, port): - if port == default_ports.get(letter): - return letter + self.myself = peer_from_env(env) + # value is max outgoing connections at a time + self.semaphore = asyncio.BoundedSemaphore(value=8) + self.retry_event = asyncio.Event() + # Peers have one entry per hostname. Once connected, the + # ip_addr property is either None, an onion peer, or the + # IP address that was connected to. Adding a peer will evict + # any other peers with the same host name or IP address. + self.peers = set() + self.onion_peers = [] + self.last_tor_retry_time = 0 + self.tor_proxy = SocksProxy(env.tor_proxy_host, env.tor_proxy_port, + loop=self.loop) + self.import_peers() + + def info(self): + '''The number of peers.''' + self.set_peer_statuses() + counter = Counter(peer.status for peer in self.peers) + return { + 'bad': counter[PEER_BAD], + 'good': counter[PEER_GOOD], + 'never': counter[PEER_NEVER], + 'stale': counter[PEER_STALE], + 'total': len(self.peers), + } + + def set_peer_statuses(self): + '''Set peer statuses.''' + cutoff = time.time() - STALE_SECS + for peer in self.peers: + if peer.bad: + peer.status = PEER_BAD + elif peer.last_connect > cutoff: + peer.status = PEER_GOOD + elif peer.last_connect: + peer.status = PEER_STALE + else: + peer.status = PEER_NEVER + + def rpc_data(self): + '''Peer data for the peers RPC method.''' + self.set_peer_statuses() + + descs = ['good', 'stale', 'never', 'bad'] + def peer_data( peer): + data = peer.serialize() + data['status'] = descs[peer.status] + return data + + def peer_key(peer): + return (peer.bad, -peer.last_connect) + + return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)] + + def add_peers(self, peers, limit=3, check_ports=False, source=None): + '''Add a limited number of peers that are not already present.''' + retry = False + new_peers = [] + for peer in peers: + matches = peer.matches(self.peers) + if not matches: + new_peers.append(peer) + elif check_ports: + for match in matches: + if match.check_ports(peer): + self.logger.info('ports changed for {}'.format(peer)) + retry = True + + if new_peers: + retry = True + source = source or new_peers[0].source + if limit: + random.shuffle(new_peers) + use_peers = new_peers[:limit] + else: + use_peers = new_peers + self.logger.info('accepted {:d}/{:d} new peers of {:d} from {}' + .format(len(use_peers), len(new_peers), + len(peers), source)) + self.peers.update(use_peers) + + if retry: + self.retry_event.set() + + def on_add_peer(self, features, source): + '''Add peers from an incoming connection.''' + peers = Peer.peers_from_features(features, source) + if peers: + self.log_info('add_peer request received from {}' + .format(peers[0].host)) + self.add_peers(peers, check_ports=True) + return bool(peers) + + def on_peers_subscribe(self, is_tor): + '''Returns the server peers as a list of (ip, host, details) tuples. + + We return all peers we've connected to in the last day. + Additionally, if we don't have onion routing, we return up to + three randomly selected onion servers. + ''' + cutoff = time.time() - STALE_SECS + recent = [peer for peer in self.peers + if peer.last_connect > cutoff + and not peer.bad and peer.is_public] + onion_peers = [] + + # Always report ourselves if valid (even if not public) + peers = set() + if self.myself.last_connect > cutoff: + peers.add(self.myself) + + # Bucket the clearnet peers and select one from each + buckets = defaultdict(list) + for peer in recent: + if peer.is_tor: + onion_peers.append(peer) else: - return letter + str(port) + buckets[peer.bucket()].append(peer) + peers.update(random.choice(bpeers) for bpeers in buckets.values()) + + # Add up to 20% onion peers (but up to 10 is OK anyway) + onion_peers = onion_peers or self.onion_peers + random.shuffle(onion_peers) + max_onion = 50 if is_tor else max(10, len(peers) // 4) + + peers.update(onion_peers[:max_onion]) + + return [peer.to_tuple() for peer in peers] + + def serialize(self): + serialized_peers = [peer.serialize() for peer in self.peers + if not peer.bad] + data = (1, serialized_peers) # version 1 + return repr(data) - parts = [host, 'v' + protocol_version] - for letter, port in (('s', ssl_port), ('t', tcp_port)): - if port: - parts.append(port_text(letter, port)) - return ' '.join(parts) + def write_peers_file(self): + with util.open_truncate(PEERS_FILE) as f: + f.write(self.serialize().encode()) + self.logger.info('wrote out {:,d} peers'.format(len(self.peers))) - def irc_name_pairs(self): - return [(self.real_name(identity.host, self.PROTOCOL_VERSION, - identity.tcp_port, identity.ssl_port), - identity.nick_suffix) - for identity in self._identities] + def read_peers_file(self): + try: + with util.open_file(PEERS_FILE, create=True) as f: + data = f.read(-1).decode() + except Exception as e: + self.logger.error('error reading peers file {}'.format(e)) + else: + if data: + version, items = ast.literal_eval(data) + if version == 1: + peers = [Peer.deserialize(item) for item in items] + self.add_peers(peers, source='peers file', limit=None) + + def import_peers(self): + '''Import hard-coded peers from a file or the coin defaults.''' + self.add_peers([self.myself]) + coin_peers = self.env.coin.PEERS + self.onion_peers = [Peer.from_real_name(rn, 'coins.py') + for rn in coin_peers if '.onion ' in rn] + + # If we don't have many peers in the peers file, add + # hard-coded ones + self.read_peers_file() + if len(self.peers) < 5: + peers = [Peer.from_real_name(real_name, 'coins.py') + for real_name in coin_peers] + self.add_peers(peers, limit=None) + + def connect_to_irc(self): + '''Connect to IRC if not disabled.''' + if self.env.irc and self.env.coin.IRC_PREFIX: + pairs = [(self.myself.real_name(ident.host), ident.nick_suffix) + for ident in self.env.identities] + self.ensure_future(self.irc.start(pairs)) + else: + self.logger.info('IRC is disabled') - def identities(self): - '''Return a list of network identities of this server.''' - return self._identities + def add_irc_peer(self, nick, real_name): + '''Add an IRC peer.''' + peer = Peer.from_real_name(real_name, '{}'.format(nick)) + self.add_peers([peer]) def ensure_future(self, coro, callback=None): '''Schedule the coro to be run.''' return self.controller.ensure_future(coro, callback=callback) async def main_loop(self): - '''Not a loop for now...''' - if self.env.irc: - self.ensure_future(self.irc.start(self.irc_name_pairs())) - else: - self.logger.info('IRC is disabled') + '''Main loop performing peer maintenance. This includes - def dns_lookup_peer(self, nick, hostname, details): + 1) Forgetting unreachable peers. + 2) Verifying connectivity of new peers. + 3) Retrying old peers at regular intervals. + ''' + self.connect_to_irc() try: - ip_addr = None - try: - ip_addr = socket.gethostbyname(hostname) - except socket.error: - pass # IPv6? - ip_addr = ip_addr or hostname - self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) - self.logger.info('new IRC peer {} at {} ({})' - .format(nick, hostname, details)) - except UnicodeError: - # UnicodeError comes from invalid domains (issue #68) - self.logger.info('IRC peer domain {} invalid'.format(hostname)) - - def add_irc_peer(self, *args): - '''Schedule DNS lookup of peer.''' - self.controller.schedule_executor(self.dns_lookup_peer, *args) - - def remove_irc_peer(self, nick): - '''Remove a peer from our IRC peers map.''' - self.logger.info('removing IRC peer {}'.format(nick)) - self.irc_peers.pop(nick, None) - - def count(self): - return len(self.irc_peers) + while True: + timeout = self.loop.call_later(WAKEUP_SECS, + self.retry_event.set) + await self.retry_event.wait() + self.retry_event.clear() + timeout.cancel() + await self.retry_peers() + finally: + self.write_peers_file() - def rpc_data(self): - return self.irc_peers + def is_coin_onion_peer(self, peer): + '''Return true if this peer is a hard-coded onion peer.''' + return peer.is_tor and any(peer.host in real_name + for real_name in self.env.coin.PEERS) + + async def retry_peers(self): + '''Retry peers that are close to getting stale.''' + # Exponential backoff of retries + now = time.time() + nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2 + + def retry_peer(peer): + # Try some Tor at startup to determine the proxy so we can + # serve the right banner file + if self.last_tor_retry_time == 0 and self.is_coin_onion_peer(peer): + return True + # Retry a peer whose ports might have updated + if peer.other_port_pairs: + return True + # Retry a good connection if it is about to turn stale + if peer.try_count == 0: + return peer.last_connect < nearly_stale_time + # Retry a failed connection if enough time has passed + return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count + + peers = [peer for peer in self.peers if retry_peer(peer)] + + # If we don't have a tor proxy drop tor peers, but retry + # occasionally + if self.tor_proxy.port is None: + if now < self.last_tor_retry_time + 3600: + peers = [peer for peer in peers if not peer.is_tor] + elif any(peer.is_tor for peer in peers): + self.last_tor_retry_time = now + + for peer in peers: + peer.last_try = time.time() + peer.try_count += 1 + pairs = peer.connection_port_pairs() + if peer.bad or not pairs: + self.maybe_forget_peer(peer) + else: + await self.semaphore.acquire() + self.retry_peer(peer, pairs) + + def retry_peer(self, peer, port_pairs): + kind, port = port_pairs[0] + # Python 3.5.3: use PROTOCOL_TLS + sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23) if kind == 'SSL' else None + + if peer.is_tor: + create_connection = self.tor_proxy.create_connection + else: + create_connection = self.loop.create_connection + + protocol_factory = partial(PeerSession, peer, self, kind) + coro = create_connection(protocol_factory, peer.host, port, ssl=sslc) + callback = partial(self.connection_done, peer, port_pairs) + self.ensure_future(coro, callback) + + def connection_done(self, peer, port_pairs, future): + '''Called when a connection attempt succeeds or fails. + + If failed, log it and try remaining port pairs. If none, + release the connection count semaphore. + ''' + exception = future.exception() + if exception: + kind, port = port_pairs[0] + self.logger.info('failed connecting to {} at {} port {:d}: {}' + .format(peer, kind, port, exception)) + port_pairs = port_pairs[1:] + if port_pairs: + self.retry_peer(peer, port_pairs) + else: + self.set_connection_status(peer, False) + self.semaphore.release() + + def connection_lost(self, session): + '''Called by the peer session when disconnected.''' + self.semaphore.release() + + def set_connection_status(self, peer, good): + '''Called when a connection succeeded or failed.''' + if good: + peer.try_count = 0 + peer.source = 'peer' + # Remove matching IP addresses + for match in peer.matches(self.peers): + if match != peer and peer.host == peer.ip_addr: + self.peers.remove(match) + else: + self.maybe_forget_peer(peer) + + def maybe_forget_peer(self, peer): + '''Forget the peer if appropriate, e.g. long-term unreachable.''' + if peer.bad: + forget = peer.last_connect < time.time() - STALE_SECS // 2 + else: + try_limit = 10 if peer.last_connect else 3 + forget = peer.try_count >= try_limit + + if forget: + desc = 'bad' if peer.bad else 'unreachable' + self.logger.info('forgetting {} peer: {}'.format(desc, peer)) + self.peers.discard(peer) - def on_peers_subscribe(self): - '''Returns the server peers as a list of (ip, host, details) tuples.''' - return list(self.irc_peers.values()) + return forget diff --git a/server/session.py b/server/session.py index 9f16b87..490fc1c 100644 --- a/server/session.py +++ b/server/session.py @@ -7,12 +7,13 @@ '''Classes for local RPC server and remote client TCP/SSL servers.''' +import codecs import time from functools import partial from lib.jsonrpc import JSONSession, RPCError, JSONRPCv2 from server.daemon import DaemonError -from server.version import VERSION +import server.version as version class SessionBase(JSONSession): @@ -32,6 +33,7 @@ class SessionBase(JSONSession): self.env = controller.env self.daemon = self.bp.daemon self.client = 'unknown' + self.protocol_version = '1.0' self.anon_logs = self.env.anon_logs self.last_delay = 0 self.txs_sent = 0 @@ -42,6 +44,7 @@ class SessionBase(JSONSession): self.bw_time = self.start_time self.bw_interval = 3600 self.bw_used = 0 + self.peer_added = False def have_pending_items(self): '''Called each time the pending item queue goes from empty to having @@ -74,6 +77,7 @@ class SessionBase(JSONSession): def connection_lost(self, exc): '''Handle client disconnection.''' + super().connection_lost(exc) msg = '' if self.pause: msg += ' whilst paused' @@ -116,6 +120,8 @@ class ElectrumX(SessionBase): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, + 'server.add_peer': self.add_peer, + 'server.banner': self.banner, 'server.features': self.server_features, 'server.peers.subscribe': self.peers_subscribe, 'server.version': self.server_version, @@ -171,9 +177,19 @@ class ElectrumX(SessionBase): self.subscribe_height = True return self.height() + def add_peer(self, features): + '''Add a peer.''' + if self.peer_added: + return False + peer_mgr = self.controller.peer_mgr + peer_info = self.peer_info() + source = peer_info[0] if peer_info else 'unknown' + self.peer_added = peer_mgr.on_add_peer(features, source) + return self.peer_added + def peers_subscribe(self): '''Return the server peers as a list of (ip, host, details) tuples.''' - return self.controller.peer_mgr.on_peers_subscribe() + return self.controller.peer_mgr.on_peers_subscribe(self.is_tor()) async def address_subscribe(self, address): '''Subscribe to an address. @@ -190,18 +206,50 @@ class ElectrumX(SessionBase): def server_features(self): '''Returns a dictionary of server features.''' - peer_mgr = self.controller.peer_mgr - hosts = {identity.host: { - 'tcp_port': identity.tcp_port, - 'ssl_port': identity.ssl_port, - } for identity in peer_mgr.identities()} - - return { - 'hosts': hosts, - 'pruning': peer_mgr.pruning, - 'protocol_version': peer_mgr.PROTOCOL_VERSION, - 'server_version': VERSION, - } + return self.controller.peer_mgr.myself.features + + def is_tor(self): + '''Try to detect if the connection is to a tor hidden service we are + running.''' + tor_proxy = self.controller.peer_mgr.tor_proxy + peer_info = self.peer_info() + return peer_info and peer_info[0] == tor_proxy.ip_addr + + async def replaced_banner(self, banner): + network_info = await self.controller.daemon_request('getnetworkinfo') + ni_version = network_info['version'] + major, minor = divmod(ni_version, 1000000) + minor, revision = divmod(minor, 10000) + revision //= 100 + daemon_version = '{:d}.{:d}.{:d}'.format(major, minor, revision) + for pair in [ + ('$VERSION', version.VERSION), + ('$DAEMON_VERSION', daemon_version), + ('$DAEMON_SUBVERSION', network_info['subversion']), + ('$DONATION_ADDRESS', self.env.donation_address), + ]: + banner = banner.replace(*pair) + return banner + + async def banner(self): + '''Return the server banner text.''' + banner = 'Welcome to Electrum!' + + if self.is_tor(): + banner_file = self.env.tor_banner_file + else: + banner_file = self.env.banner_file + if banner_file: + try: + with codecs.open(banner_file, 'r', 'utf-8') as f: + banner = f.read() + except Exception as e: + self.log_error('reading banner file {}: {}' + .format(banner_file, e)) + else: + banner = await self.replaced_banner(banner) + + return banner def server_version(self, client_name=None, protocol_version=None): '''Returns the server version as a string. @@ -213,7 +261,7 @@ class ElectrumX(SessionBase): self.client = str(client_name)[:17] if protocol_version is not None: self.protocol_version = protocol_version - return VERSION + return version.VERSION async def transaction_broadcast(self, raw_tx): '''Broadcast a raw transaction to the network. diff --git a/server/version.py b/server/version.py index bb177d8..b1c7dc6 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1,3 @@ -VERSION = "ElectrumX 0.10.19" +VERSION = "ElectrumX 0.10.p6" +PROTOCOL_MIN = '1.0' +PROTOCOL_MAX = '1.0'