diff --git a/LICENCE b/LICENCE index db7d8c9..eac7aaf 100644 --- a/LICENCE +++ b/LICENCE @@ -1,4 +1,4 @@ -Copyright (c) 2016, Neil Booth +Copyright (c) 2016-2017, Neil Booth All rights reserved. diff --git a/README.rst b/README.rst index f981cf2..f0fc274 100644 --- a/README.rst +++ b/README.rst @@ -8,7 +8,7 @@ ElectrumX - Reimplementation of electrum-server =============================================== :Licence: MIT - :Language: Python (>= 3.5) + :Language: Python (>= 3.5.1) :Author: Neil Booth Getting Started @@ -42,6 +42,7 @@ Features - Daemon failover. More than one daemon can be specified, and ElectrumX will failover round-robin style if the current one fails for any reason. +- peer discovery protocol removes need for IRC - Coin abstraction makes compatible altcoin and testnet support easy. Motivation @@ -113,8 +114,8 @@ and associated complications. Roadmap Pre-1.0 =============== -- minor code cleanups. -- implement simple protocol to discover peers without resorting to IRC. +- minor code cleanups +- fixes for a couple of outstanding issues Roadmap Post-1.0 ================ @@ -128,18 +129,33 @@ Roadmap Post-1.0 Database Format =============== -The database format of ElectrumX is unlikely to change from the 0.10.0 -version prior to the release of 1.0. +The database format of ElectrumX will not change from the 0.10.0 +version for the release of 1.0. ChangeLog ========= +Version 0.11.0 +-------------- + +* implementation of `docs/PEER_DISCOVERY.rst`_ for discovery of server + peers without using IRC. Closes `#104`_. Since all testnet peers + are ElectrumX servers, IRC advertising is now disabled on bitcoin + testnet. + + Thanks to bauerj, hsmiths and JWU42 for their help testing these + changes over the last month. +* you can now specify a tor proxy (or have it autodetected if local), + and if an incoming connection seems to be from the proxy a + tor-specific banner file is served. See **TOR_BANNER_FILE** in + `docs/ENVIRONMENT.rst`_. + Version 0.10.19 --------------- * update `docs/PEER_DISCOVERY.rst`_ -* accept IPv6 addresses in DAEMON_URL (fixes #126) +* accept IPv6 addresses in DAEMON_URL (fixes `#126`_) Version 0.10.18 --------------- @@ -312,7 +328,10 @@ stability please stick with the 0.9 series. .. _#101: https://github.com/kyuupichan/electrumx/issues/101 .. _#102: https://github.com/kyuupichan/electrumx/issues/102 .. _#103: https://github.com/kyuupichan/electrumx/issues/103 +.. _#104: https://github.com/kyuupichan/electrumx/issues/104 .. _#110: https://github.com/kyuupichan/electrumx/issues/110 .. _#111: https://github.com/kyuupichan/electrumx/issues/111 +.. _#126: https://github.com/kyuupichan/electrumx/issues/126 .. _docs/HOWTO.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/HOWTO.rst +.. _docs/ENVIRONMENT.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/ENVIRONMENT.rst .. _docs/PEER_DISCOVERY.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/PEER_DISCOVERY.rst diff --git a/docs/ENVIRONMENT.rst b/docs/ENVIRONMENT.rst index 07c5204..e799b2a 100644 --- a/docs/ENVIRONMENT.rst +++ b/docs/ENVIRONMENT.rst @@ -122,6 +122,11 @@ These environment variables are optional: + **$DONATION_ADDRESS** is replaced with the address from the **DONATION_ADDRESS** environment variable. +* **TOR_BANNER_FILE** + + As for **BANNER_FILE** (which is also the default) but shown to + incoming connections believed to be to your Tor hidden service. + * **ANON_LOGS** Set to anything non-empty to replace IP addresses in logs with @@ -207,6 +212,33 @@ raise them. functioning Electrum clients by default will send pings roughly every 60 seconds. +TOR +--- + +In response to the `server.peers.subscribe` RPC call, ElectrumX will +only return peer servers that is has recently connected to and +verified basic functionality. + +If you are not running a Tor proxy ElectrumX will be unable to connect +to onion server peers, in which case rather than returning no onion +peers it will fall back to a hard-coded list. + +To give incoming clients a full range of onion servers you will need +to be running a Tor proxy for ElectrumX to use. + +* **TOR_PROXY_HOST** + + The host where the Tor proxy is running. Defaults to *127.0.0.1*. + If you use a hostname here rather than an IP address, you must have + Python version >= 3.5.3, Python 3.5.2 will **not** work. + +* **TOR_PROXY_PORT** + + The port on which the Tor proxy is running. If not set, ElectrumX + will autodetect any proxy running on the usual ports 9050 (Tor), + 9150 (Tor browser bundle) and 1080 (socks). + + IRC --- @@ -255,6 +287,11 @@ connectivity on IRC: unless it is '0', otherwise **SSL_PORT**. '0' disables publishing the port. + **NOTE**: Certificate-Authority signed certificates don't work over + Tor, so you should set **REPORT_SSL_PORT_TOR** to 0 if yours is not + self-signed. + + Cache ----- diff --git a/docs/PEER_DISCOVERY.rst b/docs/PEER_DISCOVERY.rst index 1187841..a69b9c3 100644 --- a/docs/PEER_DISCOVERY.rst +++ b/docs/PEER_DISCOVERY.rst @@ -160,8 +160,8 @@ Unknown keys should be silently ignored. * **protocol_min** Strings that are the minimum and maximum Electrum protcol versions - this server speaks. Should be the same as what would suffix the - letter **v** in the IRC real name. Example: "1.1". + this server speaks. The maximum value should be the same as what + would suffix the letter **v** in the IRC real name. Example: "1.1". * **pruning** diff --git a/docs/PROTOCOL.rst b/docs/PROTOCOL.rst index e5e372d..5f3151e 100644 --- a/docs/PROTOCOL.rst +++ b/docs/PROTOCOL.rst @@ -723,8 +723,8 @@ Get a list of features and services supported by the server. * **pruning** - The history pruning limit of the server. If the server does not - prune return *null*. + The history pruning limit of the server as an integer. If the + server does not prune return *null*. **Example Response** diff --git a/docs/RPC-INTERFACE.rst b/docs/RPC-INTERFACE.rst index 4816838..bb0aed9 100644 --- a/docs/RPC-INTERFACE.rst +++ b/docs/RPC-INTERFACE.rst @@ -62,10 +62,10 @@ The following commands are available: Sessions are put into groups, primarily as an anti-DoS measure. Initially all connections made within a period of time are put in the same group. High bandwidth usage by a member of a group - deprioritizes itself, and all members of its group to a lesser + deprioritizes that session, and all members of its group to a lesser extent. Low-priority sessions have their requests served after higher priority sessions. ElectrumX will start delaying responses - to a sessions if it becomes sufficiently deprioritized. + to a session if it becomes sufficiently deprioritized. * **sessions** @@ -125,7 +125,7 @@ The following commands are available: ElectrumX initiates the socket close process for the passed sessions. Whilst most connections close quickly, it can take - several minutes for Python to close some SSL connections down. + several minutes for Python to shut some SSL connections down. * **log** @@ -153,22 +153,25 @@ The following commands are available: Returns a list of peer electrum servers. This command takes no arguments. - Currently this is data gleaned from an IRC session. + Currently peer data is obtained via a peer discovery protocol; it + used to be taken from IRC. * **daemon_url** - This command takes an option argument that is interpreted - identically to the **DAEMON_URL** environment variable. If default - value of the argument is the **DAEMON_URL** environment variable. + This command takes an optional argument that is interpreted + identically to the **DAEMON_URL** environment variable. If omitted, + the default argument value is the process's **DAEMON_URL** + environment variable. - The command replaces the daemon's URL at run-time, and rotates to the - first in the list. + This command replaces the daemon's URL at run-time, and also + forecefully rotates to the first URL in the list. - For example, in case ElectrumX has rotated to a secondary daemon and - you want to revert to the first after fixing the issue, call this - command without an argument. + For example, in case ElectrumX has previously failed over to a + secondary daemon and you want to revert to the primary having + resolved the connectivity issue, invoking this command without an + argument will have that effect. * **reorg** Force a block chain reorg. This command takes an optional - argument - the number of blocks to reorg - that defaults to 3. + argument - the number of blocks to reorg - which defaults to 3. diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 122a7df..8cffa3b 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -40,12 +40,9 @@ class RPCClient(JSONSession): self.send_request(handler, method, params) def handle_response(self, method, result, error): - if method in ('groups', 'sessions') and not error: - if method == 'groups': - lines = Controller.groups_text_lines(result) - else: - lines = Controller.sessions_text_lines(result) - for line in lines: + if method in ('groups', 'peers', 'sessions') and not error: + lines_func = getattr(Controller, '{}_text_lines'.format(method)) + for line in lines_func(result): print(line) elif error: print('error: {} (code {:d})' @@ -74,7 +71,7 @@ def rpc_send_and_wait(port, method, params, timeout=15): def main(): '''Send the RPC command to the server and print the result.''' - parser = argparse.ArgumentParser('Send electrumx an RPC command' ) + parser = argparse.ArgumentParser('Send electrumx an RPC command') parser.add_argument('-p', '--port', metavar='port_num', type=int, help='RPC port number') parser.add_argument('command', nargs=1, default=[], diff --git a/electrumx_server.py b/electrumx_server.py index f23542f..dd57f2a 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -25,6 +25,7 @@ SUPPRESS_MESSAGES = [ 'Fatal write error on socket transport', ] + def main_loop(): '''Start the server.''' if os.geteuid() == 0: @@ -32,7 +33,7 @@ def main_loop(): 'account and use that') loop = asyncio.get_event_loop() - #loop.set_debug(True) + # loop.set_debug(True) def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' @@ -43,7 +44,7 @@ def main_loop(): def on_exception(loop, context): '''Suppress spurious messages it appears we cannot control.''' message = context.get('message') - if not message in SUPPRESS_MESSAGES: + if message not in SUPPRESS_MESSAGES: if not ('task' in context and 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) diff --git a/lib/coins.py b/lib/coins.py index 11e4f04..c9ad2ea 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -2,8 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. '''Module providing coin abstraction. @@ -37,6 +55,7 @@ class Coin(object): RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?') VALUE_PER_COIN = 100000000 CHUNK_SIZE = 2016 + IRC_PREFIX = None IRC_SERVER = "irc.freenode.net" IRC_PORT = 6667 HASHX_LEN = 11 @@ -50,10 +69,10 @@ class Coin(object): Raise an exception if unrecognised.''' req_attrs = ('TX_COUNT', 'TX_COUNT_HEIGHT', 'TX_PER_BLOCK', - 'IRC_CHANNEL', 'IRC_PREFIX') + 'IRC_CHANNEL') for coin in util.subclasses(Coin): - if (coin.NAME.lower() == name.lower() - and coin.NET.lower() == net.lower()): + if (coin.NAME.lower() == name.lower() and + coin.NET.lower() == net.lower()): missing = [attr for attr in req_attrs if not hasattr(coin, attr)] if missing: @@ -285,25 +304,28 @@ class Bitcoin(Coin): IRC_CHANNEL = "#electrum" RPC_PORT = 8332 PEERS = [ - '4cii7ryno5j3axe4.onion t' 'btc.smsys.me s995', 'ca6ulp2j2mpsft3y.onion s t', 'electrum.be s t', - 'electrum.trouth.net s t', + 'electrum.trouth.net p10000 s t', 'electrum.vom-stausee.de s t', - 'electrum3.hachre.de s t', + 'electrum3.hachre.de p10000 s t', 'electrum.hsmiths.com s t', 'erbium1.sytes.net s t', - 'h.1209k.com s t', + 'fdkbwjykvl2f3hup.onion p10000 s t', + 'h.1209k.com p10000 s t', 'helicarrier.bauerj.eu s t', + 'hsmiths4fyqlw5xw.onion s t', 'ozahtqwp25chjdjd.onion s t', 'us11.einfachmalnettsein.de s t', + 'ELEX01.blackpole.online s t', ] class BitcoinTestnet(Bitcoin): SHORTNAME = "XTN" NET = "testnet" + IRC_PREFIX = None XPUB_VERBYTES = bytes.fromhex("043587cf") XPRV_VERBYTES = bytes.fromhex("04358394") P2PKH_VERBYTE = 0x6f @@ -315,16 +337,15 @@ class BitcoinTestnet(Bitcoin): TX_COUNT = 12242438 TX_COUNT_HEIGHT = 1035428 TX_PER_BLOCK = 21 - IRC_PREFIX = "ET_" RPC_PORT = 18332 PEER_DEFAULT_PORTS = {'t': '51001', 's': '51002'} PEERS = [ 'electrum.akinbo.org s t', 'he36kyperp3kbuxu.onion s t', 'electrum-btc-testnet.petrkr.net s t', - 'testnet.hsmiths.com t53011 s53012', - 'hsmithsxurybd7uh.onion t53011', - 'testnet.not.fyi s t', + 'testnet.hsmiths.com t53011', + 'hsmithsxurybd7uh.onion t53011 s53012', + 'ELEX05.blackpole.online t52001 s52002', ] @@ -477,7 +498,7 @@ class DashTestnet(Dash): TX_PER_BLOCK = 1 RPC_PORT = 19998 IRC_PREFIX = "d_" - PEER_DEFAULT_PORTS = {'t':'51001', 's':'51002'} + PEER_DEFAULT_PORTS = {'t': '51001', 's': '51002'} PEERS = [ 'electrum.dash.siampm.com s t', ] diff --git a/lib/hash.py b/lib/hash.py index 7ab28af..4aba0fe 100644 --- a/lib/hash.py +++ b/lib/hash.py @@ -2,8 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. '''Cryptograph hash functions and related classes.''' diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 13f1895..e0d90ae 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -2,8 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. '''Classes for acting as a peer over a transport and speaking the JSON RPC versions 1.0 and 2.0. @@ -41,9 +59,35 @@ class JSONRPC(object): INVALID_ARGS = -32602 INTERNAL_ERROR = -32603 + # Codes for this library + INVALID_RESPONSE = -100 + ERROR_CODE_UNAVAILABLE = -101 + REQUEST_TIMEOUT = -102 + ID_TYPES = (type(None), str, numbers.Number) HAS_BATCHES = False + @classmethod + def canonical_error(cls, error): + '''Convert an error to a JSON RPC 2.0 error. + + Handlers then only have a single form of error to deal with. + ''' + if isinstance(error, int): + error = {'code': error} + elif isinstance(error, str): + error = {'message': error} + elif not isinstance(error, dict): + error = {'data': error} + error['code'] = error.get('code', JSONRPC.ERROR_CODE_UNAVAILABLE) + error['message'] = error.get('message', 'error message unavailable') + return error + + @classmethod + def timeout_error(cls): + return {'message': 'request timed out', + 'code': JSONRPC.REQUEST_TIMEOUT} + class JSONRPCv1(JSONRPC): '''JSON RPC version 1.0.''' @@ -71,23 +115,23 @@ class JSONRPCv1(JSONRPC): @classmethod def handle_response(cls, handler, payload): - '''JSON v1 response handler. Both 'error' and 'response' + '''JSON v1 response handler. Both 'error' and 'result' should exist with exactly one being None. + + Unfortunately many 1.0 clients behave like 2.0, and just send + one or the other. ''' - try: - result = payload['result'] - error = payload['error'] - except KeyError: - pass + error = payload.get('error') + if error is None: + handler(payload.get('result'), None) else: - if (result is None) != (error is None): - handler(result, error) + handler(None, cls.canonical_error(error)) @classmethod def is_request(cls, payload): '''Returns True if the payload (which has a method) is a request. False means it is a notification.''' - return payload.get('id') != None + return payload.get('id') is not None class JSONRPCv2(JSONRPC): @@ -124,17 +168,17 @@ class JSONRPCv2(JSONRPC): @classmethod def handle_response(cls, handler, payload): - '''JSON v2 response handler. Exactly one of 'error' and 'response' + '''JSON v2 response handler. Exactly one of 'error' and 'result' must exist. Errors must have 'code' and 'message' members. ''' - if ('error' in payload) != ('result' in payload): - if 'result' in payload: - handler(payload['result'], None) - else: - error = payload['error'] - if (isinstance(error, dict) and 'code' in error - and 'message' in error): - handler(None, error) + if 'error' in payload: + handler(None, cls.canonical_error(payload['error'])) + elif 'result' in payload: + handler(payload['result'], None) + else: + error = {'message': 'no error or result returned', + 'code': JSONRPC.INVALID_RESPONSE} + handler(None, cls.canonical_error(error)) @classmethod def batch_size(cls, parts): @@ -213,15 +257,49 @@ class JSONSessionBase(util.LoggedClass): responses. Incoming messages are queued. When the queue goes from empty ''' - - NEXT_SESSION_ID = 0 + _next_session_id = 0 + _pending_reqs = {} @classmethod def next_session_id(cls): - session_id = cls.NEXT_SESSION_ID - cls.NEXT_SESSION_ID += 1 + '''Return the next unique session ID.''' + session_id = cls._next_session_id + cls._next_session_id += 1 return session_id + def _pending_request_keys(self): + '''Return a generator of pending request keys for this session.''' + return [key for key in self._pending_reqs if key[0] is self] + + def has_pending_requests(self): + '''Return True if this session has pending requests.''' + return bool(self._pending_request_keys()) + + def pop_response_handler(self, msg_id): + '''Return the response handler for the given message ID.''' + return self._pending_reqs.pop((self, msg_id), (None, None))[0] + + def timeout_session(self): + '''Trigger timeouts for all of the session's pending requests.''' + self._timeout_requests(self._pending_request_keys()) + + @classmethod + def timeout_check(cls): + '''Trigger timeouts where necessary for all pending requests.''' + now = time.time() + keys = [key for key, value in cls._pending_reqs.items() + if value[1] < now] + cls._timeout_requests(keys) + + @classmethod + def _timeout_requests(cls, keys): + '''Trigger timeouts for the given lookup keys.''' + values = [cls._pending_reqs.pop(key) for key in keys] + handlers = [handler for handler, timeout in values] + timeout_error = JSONRPC.timeout_error() + for handler in handlers: + handler(None, timeout_error) + def __init__(self, version=JSONRPCCompat): super().__init__() @@ -245,7 +323,6 @@ class JSONSessionBase(util.LoggedClass): self.batch_results = [] # Handling of outgoing requests self.next_request_id = 0 - self.pending_responses = {} # If buffered incoming data exceeds this the connection is closed self.max_buffer_size = 1000000 self.max_send = 50000 @@ -296,7 +373,7 @@ class JSONSessionBase(util.LoggedClass): '''Extract and return the ID from the payload. Raises an RPCError if it is missing or invalid.''' - if not 'id' in payload: + if 'id' not in payload: raise RPCError('missing id', JSONRPC.INVALID_REQUEST) id_ = payload['id'] @@ -473,7 +550,7 @@ class JSONSessionBase(util.LoggedClass): '''Handle a single JSON response.''' try: id_ = self.check_payload_id(payload) - handler = self.pending_responses.pop(id_, None) + handler = self.pop_response_handler(id_) if handler: self.version.handle_response(handler, payload) else: @@ -593,14 +670,18 @@ class JSONSessionBase(util.LoggedClass): '''Send a JSON error.''' self.send_binary(self.error_bytes(message, code, id_)) - def send_request(self, handler, method, params=None): + def send_request(self, handler, method, params=None, timeout=30): '''Sends a request and arranges for handler to be called with the response when it comes in. + + A call to request_timeout_check() will result in pending + responses that have been waiting more than timeout seconds to + call the handler with a REQUEST_TIMEOUT error. ''' id_ = self.next_request_id self.next_request_id += 1 self.send_binary(self.request_bytes(id_, method, params)) - self.pending_responses[id_] = handler + self._pending_reqs[(self, id_)] = (handler, time.time() + timeout) def send_notification(self, method, params=None): '''Send a notification.''' @@ -679,7 +760,9 @@ class JSONSession(JSONSessionBase, asyncio.Protocol): def peer_info(self): '''Returns information about the peer.''' - return self.transport.get_extra_info('peername') + if self.transport: + return self.transport.get_extra_info('peername') + return None def abort(self): '''Cut the connection abruptly.''' @@ -691,6 +774,10 @@ class JSONSession(JSONSessionBase, asyncio.Protocol): self.transport = transport super().connection_made() + def connection_lost(self, exc): + '''Trigger timeouts of all pending requests.''' + self.timeout_session() + def is_closing(self): '''True if the underlying transport is closing.''' return self.transport and self.transport.is_closing() diff --git a/lib/peer.py b/lib/peer.py new file mode 100644 index 0000000..52ad340 --- /dev/null +++ b/lib/peer.py @@ -0,0 +1,294 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +'''Representation of a peer server.''' + +import re +from ipaddress import ip_address + +from lib.util import cachedproperty + + +class Peer(object): + + # Protocol version + VERSION_REGEX = re.compile('[0-9]+(\.[0-9]+)?$') + ATTRS = ('host', 'features', + # metadata + 'source', 'ip_addr', 'good_ports', + 'last_connect', 'last_try', 'try_count') + PORTS = ('ssl_port', 'tcp_port') + FEATURES = PORTS + ('pruning', 'server_version', + 'protocol_min', 'protocol_max') + # This should be set by the application + DEFAULT_PORTS = {} + + def __init__(self, host, features, source='unknown', ip_addr=None, + good_ports=[], last_connect=0, last_try=0, try_count=0): + '''Create a peer given a host name (or IP address as a string), + a dictionary of features, and a record of the source.''' + assert isinstance(host, str) + assert isinstance(features, dict) + self.host = host + self.features = features.copy() + # Canonicalize / clean-up + for feature in self.FEATURES: + self.features[feature] = getattr(self, feature) + # Metadata + self.source = source + self.ip_addr = ip_addr + self.good_ports = good_ports.copy() + self.last_connect = last_connect + self.last_try = last_try + self.try_count = try_count + # Transient, non-persisted metadata + self.bad = False + self.other_port_pairs = set() + + @classmethod + def peers_from_features(cls, features, source): + peers = [] + if isinstance(features, dict): + hosts = features.get('hosts') + if isinstance(hosts, dict): + peers = [Peer(host, features, source=source) + for host in hosts if isinstance(host, str)] + return peers + + @classmethod + def deserialize(cls, item): + '''Deserialize from a dictionary.''' + return cls(**item) + + @classmethod + def version_tuple(cls, vstr): + '''Convert a version string, such as "1.2", to a (major_version, + minor_version) pair. + ''' + if isinstance(vstr, str) and VERSION_REGEX.match(vstr): + if '.' not in vstr: + vstr += '.0' + else: + vstr = '1.0' + return tuple(int(part) for part in vstr.split('.')) + + def matches(self, peers): + '''Return peers whose host matches the given peer's host or IP + address. This results in our favouring host names over IP + addresses. + ''' + candidates = (self.host.lower(), self.ip_addr) + return [peer for peer in peers if peer.host.lower() in candidates] + + def __str__(self): + return self.host + + def update_features(self, features): + '''Update features in-place.''' + tmp = Peer(self.host, features) + self.features = tmp.features + for feature in self.FEATURES: + setattr(self, feature, getattr(tmp, feature)) + + def connection_port_pairs(self): + '''Return a list of (kind, port) pairs to try when making a + connection.''' + # Use a list not a set - it's important to try the registered + # ports first. + pairs = [('SSL', self.ssl_port), ('TCP', self.tcp_port)] + while self.other_port_pairs: + pairs.append(other_port_pairs.pop()) + return [pair for pair in pairs if pair[1]] + + def mark_bad(self): + '''Mark as bad to avoid reconnects but also to remember for a + while.''' + self.bad = True + + def check_ports(self, other): + '''Remember differing ports in case server operator changed them + or removed one.''' + if other.ssl_port != self.ssl_port: + self.other_port_pairs.add(('SSL', other.ssl_port)) + if other.tcp_port != self.tcp_port: + self.other_port_pairs.add(('TCP', other.tcp_port)) + return bool(self.other_port_pairs) + + @cachedproperty + def is_tor(self): + return self.host.endswith('.onion') + + @cachedproperty + def is_valid(self): + ip = self.ip_address + if not ip: + return True + return not ip.is_multicast and (ip.is_global or ip.is_private) + + @cachedproperty + def is_public(self): + ip = self.ip_address + return self.is_valid and not (ip and ip.is_private) + + @cachedproperty + def ip_address(self): + '''The host as a python ip_address object, or None.''' + try: + return ip_address(self.host) + except ValueError: + return None + + def bucket(self): + if self.is_tor: + return 'onion' + if not self.ip_addr: + return '' + return tuple(self.ip_addr.split('.')[:2]) + + def serialize(self): + '''Serialize to a dictionary.''' + return {attr: getattr(self, attr) for attr in self.ATTRS} + + def _port(self, key): + hosts = self.features.get('hosts') + if isinstance(hosts, dict): + host = hosts.get(self.host) + port = self._integer(key, host) + if port and 0 < port < 65536: + return port + return None + + def _integer(self, key, d=None): + d = d or self.features + result = d.get(key) if isinstance(d, dict) else None + if isinstance(result, str): + try: + result = int(result) + except ValueError: + pass + return result if isinstance(result, int) else None + + def _string(self, key): + result = self.features.get(key) + return result if isinstance(result, str) else None + + def _version_string(self, key): + version = self.features.get(key) + return '{:d}.{:d}'.format(*self.version_tuple(version)) + + @cachedproperty + def genesis_hash(self): + '''Returns None if no SSL port, otherwise the port as an integer.''' + return self._string('genesis_hash') + + @cachedproperty + def ssl_port(self): + '''Returns None if no SSL port, otherwise the port as an integer.''' + return self._port('ssl_port') + + @cachedproperty + def tcp_port(self): + '''Returns None if no TCP port, otherwise the port as an integer.''' + return self._port('tcp_port') + + @cachedproperty + def server_version(self): + '''Returns the server version as a string if known, otherwise None.''' + return self._string('server_version') + + @cachedproperty + def pruning(self): + '''Returns the pruning level as an integer. None indicates no + pruning.''' + pruning = self._integer('pruning') + if pruning and pruning > 0: + return pruning + return None + + @cachedproperty + def protocol_min(self): + '''Minimum protocol version as a string, e.g., 1.0''' + return self._version_string('protcol_min') + + @cachedproperty + def protocol_max(self): + '''Maximum protocol version as a string, e.g., 1.1''' + return self._version_string('protcol_max') + + def to_tuple(self): + '''The tuple ((ip, host, details) expected in response + to a peers subscription.''' + details = self.real_name().split()[1:] + return (self.ip_addr or self.host, self.host, details) + + def real_name(self, host_override=None): + '''Real name of this peer as used on IRC.''' + def port_text(letter, port): + if port == self.DEFAULT_PORTS.get(letter): + return letter + else: + return letter + str(port) + + parts = [host_override or self.host, 'v' + self.protocol_max] + if self.pruning: + parts.append('p{:d}'.format(self.pruning)) + for letter, port in (('s', self.ssl_port), ('t', self.tcp_port)): + if port: + parts.append(port_text(letter, port)) + return ' '.join(parts) + + @classmethod + def from_real_name(cls, real_name, source): + '''Real name is a real name as on IRC, such as + + "erbium1.sytes.net v1.0 s t" + + Returns an instance of this Peer class. + ''' + host = 'nohost' + features = {} + ports = {} + for n, part in enumerate(real_name.split()): + if n == 0: + host = part + continue + if part[0] in ('s', 't'): + if len(part) == 1: + port = cls.DEFAULT_PORTS[part[0]] + else: + port = part[1:] + if part[0] == 's': + ports['ssl_port'] = port + else: + ports['tcp_port'] = port + elif part[0] == 'v': + features['protocol_max'] = features['protocol_min'] = part[1:] + elif part[0] == 'p': + features['pruning'] = part[1:] + + features.update(ports) + features['hosts'] = {host: ports} + + return cls(host, features, source) diff --git a/lib/script.py b/lib/script.py index fbb2337..2c35184 100644 --- a/lib/script.py +++ b/lib/script.py @@ -2,7 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # and warranty status of this software. '''Script-related classes and functions.''' diff --git a/lib/socks.py b/lib/socks.py new file mode 100644 index 0000000..86fddb6 --- /dev/null +++ b/lib/socks.py @@ -0,0 +1,181 @@ +# Copyright (c) 2017, Neil Booth +# +# All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# and warranty status of this software. + +'''Socks proxying.''' + +import asyncio +import ipaddress +import logging +import socket +import struct +from functools import partial + +import lib.util as util + + +class Socks(util.LoggedClass): + '''Socks protocol wrapper.''' + + SOCKS5_ERRORS = { + 1: 'general SOCKS server failure', + 2: 'connection not allowed by ruleset', + 3: 'network unreachable', + 4: 'host unreachable', + 5: 'connection refused', + 6: 'TTL expired', + 7: 'command not supported', + 8: 'address type not supported', + } + + class Error(Exception): + pass + + def __init__(self, loop, sock, host, port): + super().__init__() + self.loop = loop + self.sock = sock + self.host = host + self.port = port + try: + self.ip_address = ipaddress.ip_address(host) + except ValueError: + self.ip_address = None + self.debug = False + + async def _socks4_handshake(self): + if self.ip_address: + # Socks 4 + ip_addr = self.ip_address + host_bytes = b'' + else: + # Socks 4a + ip_addr = ipaddress.ip_address('0.0.0.1') + host_bytes = self.host.encode() + b'\0' + + user_id = '' + data = b'\4\1' + struct.pack('>H', self.port) + ip_addr.packed + data += user_id.encode() + b'\0' + host_bytes + await self.loop.sock_sendall(self.sock, data) + data = await self.loop.sock_recv(self.sock, 8) + if data[0] != 0: + raise self.Error('proxy sent bad initial Socks4 byte') + if data[1] != 0x5a: + raise self.Error('proxy request failed or rejected') + + async def _socks5_handshake(self): + await self.loop.sock_sendall(self.sock, b'\5\1\0') + data = await self.loop.sock_recv(self.sock, 2) + if data[0] != 5: + raise self.Error('proxy sent bad SOCKS5 initial byte') + if data[1] != 0: + raise self.Error('proxy rejected SOCKS5 authentication method') + + if self.ip_address: + if self.ip_address.version == 4: + addr = b'\1' + self.ip_address.packed + else: + addr = b'\4' + self.ip_address.packed + else: + host = self.host.encode() + addr = b'\3' + bytes([len(host)]) + host + + data = b'\5\1\0' + addr + struct.pack('>H', self.port) + await self.loop.sock_sendall(self.sock, data) + data = await self.loop.sock_recv(self.sock, 5) + if data[0] != 5: + raise self.Error('proxy sent bad SOSCK5 response initial byte') + if data[1] != 0: + msg = self.SOCKS5_ERRORS.get(data[1], 'unknown SOCKS5 error {:d}' + .format(data[1])) + raise self.Error(msg) + if data[3] == 1: + addr_len, data = 3, data[4:] + elif data[3] == 3: + addr_len, data = data[4], b'' + elif data[3] == 4: + addr_len, data = 15, data[4:] + data = await self.loop.sock_recv(self.sock, addr_len + 2) + addr = data[:addr_len] + port, = struct.unpack('>H', data[-2:]) + + async def handshake(self): + '''Write the proxy handshake sequence.''' + if self.ip_address and self.ip_address.version == 6: + await self._socks5_handshake() + else: + await self._socks4_handshake() + + if self.debug: + address = (self.host, self.port) + self.log_info('successful connection via proxy to {}' + .format(util.address_string(address))) + + +class SocksProxy(util.LoggedClass): + + def __init__(self, host, port, loop=None): + '''Host can be an IPv4 address, IPv6 address, or a host name.''' + super().__init__() + self.host = host + self.port = port + self.ip_addr = None + self.loop = loop or asyncio.get_event_loop() + + async def create_connection(self, protocol_factory, host, port, ssl=None): + '''All arguments are as to asyncio's create_connection method.''' + if self.port is None: + proxy_ports = [9050, 9150, 1080] + else: + proxy_ports = [self.port] + + for proxy_port in proxy_ports: + address = (self.host, proxy_port) + sock = socket.socket() + sock.setblocking(False) + try: + await self.loop.sock_connect(sock, address) + except OSError as e: + if proxy_port == proxy_ports[-1]: + raise + continue + + socks = Socks(self.loop, sock, host, port) + try: + await socks.handshake() + if self.port is None: + self.ip_addr = sock.getpeername()[0] + self.port = proxy_port + self.logger.info('detected proxy at {} ({})' + .format(util.address_string(address), + self.ip_addr)) + break + except Exception as e: + sock.close() + raise + + hostname = host if ssl else None + return await self.loop.create_connection( + protocol_factory, ssl=ssl, sock=sock, server_hostname=hostname) diff --git a/lib/tx.py b/lib/tx.py index 31d2186..ceed127 100644 --- a/lib/tx.py +++ b/lib/tx.py @@ -2,7 +2,26 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # and warranty status of this software. '''Transaction-related classes and functions.''' @@ -24,6 +43,7 @@ class Tx(namedtuple("Tx", "version inputs outputs locktime")): # FIXME: add hash as a cached property? + class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")): '''Class representing a transaction input.''' @@ -32,8 +52,8 @@ class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")): @cachedproperty def is_coinbase(self): - return (self.prev_hash == TxInput.ZERO - and self.prev_idx == TxInput.MINUS_1) + return (self.prev_hash == TxInput.ZERO and + self.prev_idx == TxInput.MINUS_1) @cachedproperty def script_sig_info(self): @@ -98,10 +118,10 @@ class Deserializer(object): def _read_input(self): return TxInput( - self._read_nbytes(32), # prev_hash - self._read_le_uint32(), # prev_idx - self._read_varbytes(), # script - self._read_le_uint32() # sequence + self._read_nbytes(32), # prev_hash + self._read_le_uint32(), # prev_idx + self._read_varbytes(), # script + self._read_le_uint32() # sequence ) def _read_outputs(self): diff --git a/lib/util.py b/lib/util.py index e5c8674..e35d3b8 100644 --- a/lib/util.py +++ b/lib/util.py @@ -2,15 +2,34 @@ # # All rights reserved. # -# See the file "LICENCE" for information about the copyright +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # and warranty status of this software. '''Miscellaneous utility classes and functions.''' import array -import asyncio import inspect +from ipaddress import ip_address import logging import sys from collections import Container, Mapping @@ -109,11 +128,12 @@ def deep_getsizeof(obj): return size(obj) + def subclasses(base_class, strict=True): '''Return a list of subclasses of base_class in its module.''' def select(obj): - return (inspect.isclass(obj) and issubclass(obj, base_class) - and (not strict or obj != base_class)) + return (inspect.isclass(obj) and issubclass(obj, base_class) and + (not strict or obj != base_class)) pairs = inspect.getmembers(sys.modules[base_class.__module__], select) return [pair[1] for pair in pairs] @@ -202,3 +222,22 @@ def open_file(filename, create=False): if create: return open(filename, 'wb+') raise + + +def open_truncate(filename): + '''Open the file name. Return its handle.''' + return open(filename, 'wb+') + + +def address_string(address): + '''Return an address as a correctly formatted string.''' + fmt = '{}:{:d}' + host, port = address + try: + host = ip_address(host) + except ValueError: + pass + else: + if host.version == 6: + fmt = '[{}]:{:d}' + return fmt.format(host, port) diff --git a/query.py b/query.py index 283f33d..995ce7b 100755 --- a/query.py +++ b/query.py @@ -71,5 +71,6 @@ def main(): print('Balance: {} {}'.format(coin.decimal_value(balance), coin.SHORTNAME)) + if __name__ == '__main__': main() diff --git a/server/block_processor.py b/server/block_processor.py index c8bdd22..3093263 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -87,10 +87,11 @@ class Prefetcher(LoggedClass): with await self.semaphore: while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. - # Constrain fetch count to between 0 and 2500 regardless. + # Constrain fetch count to between 0 and 500 regardless; + # testnet can be lumpy. cache_room = self.min_cache_size // self.ave_size count = min(daemon_height - self.fetched_height, cache_room) - count = min(2500, max(count, 0)) + count = min(500, max(count, 0)) if not count: if not self.caught_up: self.caught_up = True @@ -568,7 +569,7 @@ class BlockProcessor(server.db.DB): header = coin.block_header(block, self.height) header_hash = coin.header_hash(header) if header_hash != self.tip: - raise ChainError('backup block {} is not tip {} at height {:,d}' + raise ChainError('backup block {} not tip {} at height {:,d}' .format(hash_to_str(header_hash), hash_to_str(self.tip), self.height)) self.tip = coin.header_prevhash(header) @@ -735,7 +736,7 @@ class BlockProcessor(server.db.DB): for cache_key, cache_value in self.utxo_cache.items(): # suffix = tx_idx + tx_num hashX = cache_value[:-12] - suffix = cache_key[-2:] + cache_value[-12:-8] + suffix = cache_key[-2:] + cache_value[-12:-8] batch_put(b'h' + cache_key[:4] + suffix, hashX) batch_put(b'u' + hashX + suffix, cache_value[-8:]) self.utxo_cache = {} diff --git a/server/controller.py b/server/controller.py index db161c8..e5b2898 100644 --- a/server/controller.py +++ b/server/controller.py @@ -6,7 +6,6 @@ # and warranty status of this software. import asyncio -import codecs import json import os import ssl @@ -20,7 +19,7 @@ from functools import partial import pylru -from lib.jsonrpc import JSONRPC, RPCError +from lib.jsonrpc import JSONRPC, JSONSessionBase, RPCError from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor @@ -28,7 +27,6 @@ from server.daemon import Daemon, DaemonError from server.mempool import MemPool from server.peers import PeerManager from server.session import LocalRPC, ElectrumX -from server.version import VERSION class Controller(util.LoggedClass): @@ -88,8 +86,7 @@ class Controller(util.LoggedClass): 'address.get_proof address.listunspent ' 'block.get_header block.get_chunk estimatefee relayfee ' 'transaction.get transaction.get_merkle utxo.get_address'), - ('server', - 'banner donation_address'), + ('server', 'donation_address'), ] self.electrumx_handlers = {'.'.join([prefix, suffix]): getattr(self, suffix.replace('.', '_')) @@ -170,7 +167,7 @@ class Controller(util.LoggedClass): def enqueue_session(self, session): # Might have disconnected whilst waiting - if not session in self.sessions: + if session not in self.sessions: return priority = self.session_priority(session) item = (priority, self.next_queue_id, session) @@ -218,15 +215,21 @@ class Controller(util.LoggedClass): def on_future_done(self, future): '''Collect the result of a future after removing it from our set.''' callback = self.futures.pop(future) - if callback: - callback(future) - else: - try: + try: + if callback: + callback(future) + else: future.result() - except asyncio.CancelledError: - pass - except Exception: - self.log_error(traceback.format_exc()) + except asyncio.CancelledError: + pass + except Exception: + self.log_error(traceback.format_exc()) + + async def check_request_timeouts(self): + '''Regularly check pending JSON requests for timeouts.''' + while True: + await asyncio.sleep(30) + JSONSessionBase.timeout_check() async def wait_for_bp_catchup(self): '''Called when the block processor catches up.''' @@ -234,6 +237,7 @@ class Controller(util.LoggedClass): self.logger.info('block processor has caught up') self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.start_servers()) + self.ensure_future(self.check_request_timeouts()) self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.enqueue_delayed_sessions()) self.ensure_future(self.notify()) @@ -242,6 +246,8 @@ class Controller(util.LoggedClass): async def main_loop(self): '''Controller main loop.''' + if self.env.rpc_port is not None: + await self.start_server('RPC', 'localhost', self.env.rpc_port) self.ensure_future(self.bp.main_loop()) self.ensure_future(self.wait_for_bp_catchup()) @@ -269,12 +275,15 @@ class Controller(util.LoggedClass): for session in self.sessions: self.close_session(session) + # This might resolve "future never awaited" log + await asyncio.sleep(0) + # Cancel pending futures for future in self.futures: future.cancel() # Wait for all futures to finish - while not all (future.done() for future in self.futures): + while not all(future.done() for future in self.futures): await asyncio.sleep(0.1) # Finally shut down the block processor and executor @@ -306,9 +315,7 @@ class Controller(util.LoggedClass): .format(kind, host, port)) async def start_servers(self): - '''Start RPC, TCP and SSL servers once caught up.''' - if self.env.rpc_port is not None: - await self.start_server('RPC', 'localhost', self.env.rpc_port) + '''Start TCP and SSL servers.''' self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('session timeout: {:,d} seconds' .format(self.env.session_timeout)) @@ -329,7 +336,7 @@ class Controller(util.LoggedClass): ''' self.state = self.LISTENING - env= self.env + env = self.env if env.tcp_port is not None: await self.start_server('TCP', env.host, env.tcp_port) if env.ssl_port is not None: @@ -387,7 +394,7 @@ class Controller(util.LoggedClass): .format(session.kind, session.peername(), len(self.sessions))) if (len(self.sessions) >= self.max_sessions - and self.state == self.LISTENING): + and self.state == self.LISTENING): self.state = self.PAUSED session.log_info('maximum sessions {:,d} reached, stopping new ' 'connections until count drops to {:,d}' @@ -459,7 +466,7 @@ class Controller(util.LoggedClass): 'logged': len([s for s in self.sessions if s.log_me]), 'paused': sum(s.pause for s in self.sessions), 'pid': os.getpid(), - 'peers': self.peer_mgr.count(), + 'peers': self.peer_mgr.info(), 'requests': sum(s.count_pending_items() for s in self.sessions), 'sessions': self.session_count(), 'subs': self.sub_count(), @@ -511,6 +518,38 @@ class Controller(util.LoggedClass): ]) return result + @staticmethod + def peers_text_lines(data): + '''A generator returning lines for a list of peers. + + data is the return value of rpc_peers().''' + def time_fmt(t): + if not t: + return 'Never' + return util.formatted_time(now - t) + + now = time.time() + fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>3} ' + '{:>3} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}') + yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min', + 'Max', 'Pruning', 'Last Conn', 'Last Try', + 'Tries', 'Source', 'IP Address') + for item in data: + features = item['features'] + yield fmt.format(item['host'][:30], + item['status'], + features['tcp_port'] or '', + features['ssl_port'] or '', + features['server_version'] or 'unknown', + features['protocol_min'], + features['protocol_max'], + features['pruning'] or '', + time_fmt(item['last_connect']), + time_fmt(item['last_try']), + item['try_count'], + item['source'][:20], + item['ip_addr'] or '') + @staticmethod def sessions_text_lines(data): '''A generator returning lines for a list of sessions. @@ -872,33 +911,6 @@ class Controller(util.LoggedClass): # Client RPC "server" command handlers - async def banner(self): - '''Return the server banner text.''' - banner = 'Welcome to Electrum!' - if self.env.banner_file: - try: - with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: - banner = f.read() - except Exception as e: - self.log_error('reading banner file {}: {}' - .format(self.env.banner_file, e)) - else: - network_info = await self.daemon_request('getnetworkinfo') - version = network_info['version'] - major, minor = divmod(version, 1000000) - minor, revision = divmod(minor, 10000) - revision //= 100 - version = '{:d}.{:d}.{:d}'.format(major, minor, revision) - for pair in [ - ('$VERSION', VERSION), - ('$DAEMON_VERSION', version), - ('$DAEMON_SUBVERSION', network_info['subversion']), - ('$DONATION_ADDRESS', self.env.donation_address), - ]: - banner = banner.replace(*pair) - - return banner - def donation_address(self): '''Return the donation address as a string, empty if there is none.''' return self.env.donation_address diff --git a/server/daemon.py b/server/daemon.py index e5d1fda..d52e973 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -20,6 +20,7 @@ import lib.util as util class DaemonError(Exception): '''Raised when the daemon returns an error in its results.''' + class Daemon(util.LoggedClass): '''Handles connections to a daemon at the given URL.''' diff --git a/server/db.py b/server/db.py index df92f38..6a3d4ef 100644 --- a/server/db.py +++ b/server/db.py @@ -23,6 +23,7 @@ from server.version import VERSION UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + class DB(util.LoggedClass): '''Simple wrapper of the backend database for querying. diff --git a/server/env.py b/server/env.py index b86cb95..afd8b1b 100644 --- a/server/env.py +++ b/server/env.py @@ -44,8 +44,14 @@ class Env(LoggedClass): self.rpc_port = self.integer('RPC_PORT', 8000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) + self.tor_banner_file = self.default('TOR_BANNER_FILE', + self.banner_file) self.anon_logs = self.default('ANON_LOGS', False) self.log_sessions = self.integer('LOG_SESSIONS', 3600) + # Tor proxy + # Python 3.5.3 - revert back to localhost? + self.tor_proxy_host = self.default('TOR_PROXY_HOST', '127.0.0.1') + self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None) # The electrum client takes the empty string as unspecified self.donation_address = self.default('DONATION_ADDRESS', '') self.db_engine = self.default('DB_ENGINE', 'leveldb') @@ -60,31 +66,33 @@ class Env(LoggedClass): self.irc = self.default('IRC', False) self.irc_nick = self.default('IRC_NICK', None) - self.identity = NetIdentity( + # Identities + main_identity = NetIdentity( self.default('REPORT_HOST', self.host), self.integer('REPORT_TCP_PORT', self.tcp_port) or None, self.integer('REPORT_SSL_PORT', self.ssl_port) or None, '' ) - self.tor_identity = NetIdentity( - self.default('REPORT_HOST_TOR', ''), # must be a string - self.integer('REPORT_TCP_PORT_TOR', - self.identity.tcp_port - if self.identity.tcp_port else - self.tcp_port) or None, - self.integer('REPORT_SSL_PORT_TOR', - self.identity.ssl_port - if self.identity.ssl_port else - self.ssl_port) or None, - '_tor' - ) - - if self.irc: - if not self.identity.host.strip(): - raise self.Error('IRC host is empty') - if self.identity.tcp_port == self.identity.ssl_port: - raise self.Error('IRC TCP and SSL ports are the same') - + if not main_identity.host.strip(): + raise self.Error('IRC host is empty') + if main_identity.tcp_port == main_identity.ssl_port: + raise self.Error('IRC TCP and SSL ports are the same') + + self.identities = [main_identity] + tor_host = self.default('REPORT_HOST_TOR', '') + if tor_host.endswith('.onion'): + self.identities.append(NetIdentity( + tor_host, + self.integer('REPORT_TCP_PORT_TOR', + main_identity.tcp_port + if main_identity.tcp_port else + self.tcp_port) or None, + self.integer('REPORT_SSL_PORT_TOR', + main_identity.ssl_port + if main_identity.ssl_port else + self.ssl_port) or None, + '_tor', + )) def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/irc.py b/server/irc.py index 1df04c8..ace4d04 100644 --- a/server/irc.py +++ b/server/irc.py @@ -50,7 +50,7 @@ class IRC(LoggedClass): # Register handlers for events we're interested in reactor = irc_client.Reactor() - for event in 'welcome join quit whoreply disconnect'.split(): + for event in 'welcome join whoreply disconnect'.split(): reactor.add_global_handler(event, getattr(self, 'on_' + event)) # Note: Multiple nicks in same channel will trigger duplicate events @@ -96,12 +96,6 @@ class IRC(LoggedClass): if match: connection.who(match.group(1)) - def on_quit(self, connection, event): - '''Called when someone leaves our channel.''' - match = self.peer_regexp.match(event.source) - if match: - self.peer_mgr.remove_irc_peer(match.group(1)) - def on_whoreply(self, connection, event): '''Called when a response to our who requests arrives. @@ -111,8 +105,8 @@ class IRC(LoggedClass): nick = event.arguments[4] if nick.startswith(self.prefix): line = event.arguments[6].split() - hostname, details = line[1], line[2:] - self.peer_mgr.add_irc_peer(nick, hostname, details) + hp_string = ' '.join(line[1:]) # hostname, ports, version etc. + self.peer_mgr.add_irc_peer(nick, hp_string) class IrcClient(object): diff --git a/server/mempool.py b/server/mempool.py index 387b12c..35d177f 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -93,8 +93,8 @@ class MemPool(util.LoggedClass): process_some = self.async_process_some(unfetched, fetch_size // 2) await self.daemon.mempool_refresh_event.wait() - self.logger.info ('beginning processing of daemon mempool. ' - 'This can take some time...') + self.logger.info('beginning processing of daemon mempool. ' + 'This can take some time...') next_log = 0 loops = -1 # Zero during initial catchup @@ -162,8 +162,8 @@ class MemPool(util.LoggedClass): deferred = pending pending = [] - result, deferred = await self.controller.run_in_executor \ - (self.process_raw_txs, raw_txs, deferred) + result, deferred = await self.controller.run_in_executor( + self.process_raw_txs, raw_txs, deferred) pending.extend(deferred) hashXs = self.hashXs @@ -187,7 +187,7 @@ class MemPool(util.LoggedClass): # Skip hashes the daemon has dropped. Either they were # evicted or they got in a block. - return {hh:raw for hh, raw in zip(hex_hashes, raw_txs) if raw} + return {hh: raw for hh, raw in zip(hex_hashes, raw_txs) if raw} def process_raw_txs(self, raw_tx_map, pending): '''Process the dictionary of raw transactions and return a dictionary @@ -204,7 +204,7 @@ class MemPool(util.LoggedClass): # Deserialize each tx and put it in our priority queue for tx_hash, raw_tx in raw_tx_map.items(): - if not tx_hash in txs: + if tx_hash not in txs: continue tx, _tx_hash = deserializer(raw_tx).read_tx() @@ -267,7 +267,7 @@ class MemPool(util.LoggedClass): unconfirmed is True if any txin is unconfirmed. ''' # hashXs is a defaultdict - if not hashX in self.hashXs: + if hashX not in self.hashXs: return [] deserializer = self.coin.deserializer() @@ -279,8 +279,8 @@ class MemPool(util.LoggedClass): if not item or not raw_tx: continue txin_pairs, txout_pairs = item - tx_fee = (sum(v for hashX, v in txin_pairs) - - sum(v for hashX, v in txout_pairs)) + tx_fee = (sum(v for hashX, v in txin_pairs) - + sum(v for hashX, v in txout_pairs)) tx, tx_hash = deserializer(raw_tx).read_tx() unconfirmed = any(txin.prev_hash in self.txs for txin in tx.inputs) result.append((hex_hash, tx_fee, unconfirmed)) diff --git a/server/peers.py b/server/peers.py index c7fdaa2..c5bb5c3 100644 --- a/server/peers.py +++ b/server/peers.py @@ -7,15 +7,184 @@ '''Peer management.''' +import ast import asyncio -import socket -from collections import namedtuple +import random +import ssl +import time +from collections import defaultdict, Counter +from functools import partial +from lib.jsonrpc import JSONSession +from lib.peer import Peer +from lib.socks import SocksProxy import lib.util as util from server.irc import IRC +import server.version as version -IRCPeer = namedtuple('IRCPeer', 'ip_addr host details') +PEERS_FILE = 'peers' +PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4) +STALE_SECS = 86400 +WAKEUP_SECS = 300 + + +def peer_from_env(env): + '''Return ourself as a peer from the environment settings.''' + main_identity = env.identities[0] + hosts = {identity.host: {'tcp_port': identity.tcp_port, + 'ssl_port': identity.ssl_port} + for identity in env.identities} + features = { + 'hosts': hosts, + 'pruning': None, + 'server_version': version.VERSION, + 'protocol_min': version.PROTOCOL_MIN, + 'protocol_max': version.PROTOCOL_MAX, + 'genesis_hash': env.coin.GENESIS_HASH, + } + + return Peer(main_identity.host, features, 'env') + + +class PeerSession(JSONSession): + '''An outgoing session to a peer.''' + + def __init__(self, peer, peer_mgr, kind): + super().__init__() + self.max_send = 0 + self.peer = peer + self.peer_mgr = peer_mgr + self.kind = kind + self.failed = False + self.log_prefix = '[{}] '.format(self.peer) + + def have_pending_items(self): + self.peer_mgr.ensure_future(self.process_pending_items()) + + def connection_made(self, transport): + '''Handle an incoming client connection.''' + super().connection_made(transport) + self.log_prefix = '[{}] '.format(str(self.peer)[:25]) + + # Update IP address + if not self.peer.is_tor: + peer_info = self.peer_info() + if peer_info: + self.peer.ip_addr = peer_info[0] + + # Collect data + proto_ver = (version.PROTOCOL_MIN, version.PROTOCOL_MAX) + self.send_request(self.on_version, 'server.version', + [version.VERSION, proto_ver]) + self.send_request(self.on_peers_subscribe, 'server.peers.subscribe') + self.send_request(self.on_features, 'server.features') + + def connection_lost(self, exc): + '''Handle disconnection.''' + super().connection_lost(exc) + self.peer_mgr.connection_lost(self) + + def on_peers_subscribe(self, result, error): + '''Handle the response to the peers.subcribe message.''' + if error: + self.failed = True + self.log_error('server.peers.subscribe: {}'.format(error)) + else: + self.check_remote_peers(result) + self.close_if_done() + + def check_remote_peers(self, updates): + '''When a peer gives us a peer update. + + Each update is expected to be of the form: + [ip_addr, hostname, ['v1.0', 't51001', 's51002']] + + Return True if we're in the list of peers. + ''' + try: + real_names = [' '.join([u[1]] + u[2]) for u in updates] + peers = [Peer.from_real_name(real_name, str(self.peer)) + for real_name in real_names] + except Exception: + self.log_error('bad server.peers.subscribe response') + return False + + self.peer_mgr.add_peers(peers) + my = self.peer_mgr.myself + for peer in my.matches(peers): + if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port: + return + + # Announce ourself if not present + self.log_info('registering with server.add_peer') + self.send_request(self.on_add_peer, 'server.add_peer', [my.features]) + + def on_add_peer(self, result, error): + '''Handle the response to the add_peer message.''' + self.close_if_done() + + def on_features(self, features, error): + # Several peers don't implement this. If they do, check they are + # the same network with the genesis hash. + verified = False + if not error and isinstance(features, dict): + forget = False + our_hash = self.peer_mgr.env.coin.GENESIS_HASH + their_hash = features.get('genesis_hash') + if their_hash: + verified = their_hash == our_hash + forget = their_hash != our_hash + if forget: + self.failed = True + self.peer.mark_bad() + self.log_warning('incorrect genesis hash') + else: + self.peer.update_features(features) + # For legacy peers not implementing features, check their height + # as a proxy to determining they're on our network + if not verified: + self.send_request(self.on_headers, 'blockchain.headers.subscribe') + self.close_if_done() + + def on_headers(self, result, error): + '''Handle the response to the version message.''' + if error or not isinstance(result, dict): + self.failed = True + self.log_error('bad blockchain.headers.subscribe response') + else: + our_height = self.peer_mgr.controller.bp.db_height + their_height = result.get('block_height') + if (not isinstance(their_height, int) or + abs(our_height - their_height) > 5): + self.failed = True + self.peer.mark_bad() + self.log_warning('bad height {}'.format(their_height)) + self.close_if_done() + + def on_version(self, result, error): + '''Handle the response to the version message.''' + if error: + self.failed = True + self.log_error('server.version returned an error') + elif isinstance(result, str): + self.peer.server_version = result + self.peer.features['server_version'] = result + self.close_if_done() + + def close_if_done(self): + if not self.has_pending_requests(): + is_good = not self.failed + self.peer.last_connect = time.time() + self.peer_mgr.set_connection_status(self.peer, is_good) + if is_good: + if self.peer.is_tor: + self.log_info('verified via {} over Tor'.format(self.kind)) + else: + self.log_info('verified via {} at {}' + .format(self.kind, + self.peer_addr(anon=False))) + self.close_connection() class PeerManager(util.LoggedClass): @@ -24,88 +193,325 @@ class PeerManager(util.LoggedClass): Attempts to maintain a connection with up to 8 peers. Issues a 'peers.subscribe' RPC to them and tells them our data. ''' - PROTOCOL_VERSION = '1.0' - def __init__(self, env, controller): super().__init__() + # Initialise the Peer class + Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS self.env = env self.controller = controller + self.loop = controller.loop self.irc = IRC(env, self) - self.pruning = None - self._identities = [] - # Keyed by nick - self.irc_peers = {} - self._identities.append(env.identity) - if env.tor_identity.host.endswith('.onion'): - self._identities.append(env.tor_identity) - - def real_name(self, host, protocol_version, tcp_port, ssl_port): - '''Real name as used on IRC.''' - default_ports = self.env.coin.PEER_DEFAULT_PORTS - - def port_text(letter, port): - if port == default_ports.get(letter): - return letter + self.myself = peer_from_env(env) + # value is max outgoing connections at a time + self.semaphore = asyncio.BoundedSemaphore(value=8) + self.retry_event = asyncio.Event() + # Peers have one entry per hostname. Once connected, the + # ip_addr property is either None, an onion peer, or the + # IP address that was connected to. Adding a peer will evict + # any other peers with the same host name or IP address. + self.peers = set() + self.onion_peers = [] + self.last_tor_retry_time = 0 + self.tor_proxy = SocksProxy(env.tor_proxy_host, env.tor_proxy_port, + loop=self.loop) + self.import_peers() + + def info(self): + '''The number of peers.''' + self.set_peer_statuses() + counter = Counter(peer.status for peer in self.peers) + return { + 'bad': counter[PEER_BAD], + 'good': counter[PEER_GOOD], + 'never': counter[PEER_NEVER], + 'stale': counter[PEER_STALE], + 'total': len(self.peers), + } + + def set_peer_statuses(self): + '''Set peer statuses.''' + cutoff = time.time() - STALE_SECS + for peer in self.peers: + if peer.bad: + peer.status = PEER_BAD + elif peer.last_connect > cutoff: + peer.status = PEER_GOOD + elif peer.last_connect: + peer.status = PEER_STALE + else: + peer.status = PEER_NEVER + + def rpc_data(self): + '''Peer data for the peers RPC method.''' + self.set_peer_statuses() + descs = ['good', 'stale', 'never', 'bad'] + + def peer_data(peer): + data = peer.serialize() + data['status'] = descs[peer.status] + return data + + def peer_key(peer): + return (peer.bad, -peer.last_connect) + + return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)] + + def add_peers(self, peers, limit=3, check_ports=False, source=None): + '''Add a limited number of peers that are not already present.''' + retry = False + new_peers = [] + for peer in peers: + matches = peer.matches(self.peers) + if not matches: + new_peers.append(peer) + elif check_ports: + for match in matches: + if match.check_ports(peer): + self.logger.info('ports changed for {}'.format(peer)) + retry = True + + if new_peers: + retry = True + source = source or new_peers[0].source + if limit: + random.shuffle(new_peers) + use_peers = new_peers[:limit] + else: + use_peers = new_peers + self.logger.info('accepted {:d}/{:d} new peers of {:d} from {}' + .format(len(use_peers), len(new_peers), + len(peers), source)) + self.peers.update(use_peers) + + if retry: + self.retry_event.set() + + def on_add_peer(self, features, source): + '''Add peers from an incoming connection.''' + peers = Peer.peers_from_features(features, source) + if peers: + self.log_info('add_peer request received from {}' + .format(peers[0].host)) + self.add_peers(peers, check_ports=True) + return bool(peers) + + def on_peers_subscribe(self, is_tor): + '''Returns the server peers as a list of (ip, host, details) tuples. + + We return all peers we've connected to in the last day. + Additionally, if we don't have onion routing, we return up to + three randomly selected onion servers. + ''' + cutoff = time.time() - STALE_SECS + recent = [peer for peer in self.peers + if peer.last_connect > cutoff and + not peer.bad and peer.is_public] + onion_peers = [] + + # Always report ourselves if valid (even if not public) + peers = set() + if self.myself.last_connect > cutoff: + peers.add(self.myself) + + # Bucket the clearnet peers and select one from each + buckets = defaultdict(list) + for peer in recent: + if peer.is_tor: + onion_peers.append(peer) else: - return letter + str(port) + buckets[peer.bucket()].append(peer) + peers.update(random.choice(bpeers) for bpeers in buckets.values()) + + # Add up to 20% onion peers (but up to 10 is OK anyway) + onion_peers = onion_peers or self.onion_peers + random.shuffle(onion_peers) + max_onion = 50 if is_tor else max(10, len(peers) // 4) + + peers.update(onion_peers[:max_onion]) + + return [peer.to_tuple() for peer in peers] + + def serialize(self): + serialized_peers = [peer.serialize() for peer in self.peers + if not peer.bad] + data = (1, serialized_peers) # version 1 + return repr(data) - parts = [host, 'v' + protocol_version] - for letter, port in (('s', ssl_port), ('t', tcp_port)): - if port: - parts.append(port_text(letter, port)) - return ' '.join(parts) + def write_peers_file(self): + with util.open_truncate(PEERS_FILE) as f: + f.write(self.serialize().encode()) + self.logger.info('wrote out {:,d} peers'.format(len(self.peers))) - def irc_name_pairs(self): - return [(self.real_name(identity.host, self.PROTOCOL_VERSION, - identity.tcp_port, identity.ssl_port), - identity.nick_suffix) - for identity in self._identities] + def read_peers_file(self): + try: + with util.open_file(PEERS_FILE, create=True) as f: + data = f.read(-1).decode() + except Exception as e: + self.logger.error('error reading peers file {}'.format(e)) + else: + if data: + version, items = ast.literal_eval(data) + if version == 1: + peers = [Peer.deserialize(item) for item in items] + self.add_peers(peers, source='peers file', limit=None) + + def import_peers(self): + '''Import hard-coded peers from a file or the coin defaults.''' + self.add_peers([self.myself]) + coin_peers = self.env.coin.PEERS + self.onion_peers = [Peer.from_real_name(rn, 'coins.py') + for rn in coin_peers if '.onion ' in rn] + + # If we don't have many peers in the peers file, add + # hard-coded ones + self.read_peers_file() + if len(self.peers) < 5: + peers = [Peer.from_real_name(real_name, 'coins.py') + for real_name in coin_peers] + self.add_peers(peers, limit=None) + + def connect_to_irc(self): + '''Connect to IRC if not disabled.''' + if self.env.irc and self.env.coin.IRC_PREFIX: + pairs = [(self.myself.real_name(ident.host), ident.nick_suffix) + for ident in self.env.identities] + self.ensure_future(self.irc.start(pairs)) + else: + self.logger.info('IRC is disabled') - def identities(self): - '''Return a list of network identities of this server.''' - return self._identities + def add_irc_peer(self, nick, real_name): + '''Add an IRC peer.''' + peer = Peer.from_real_name(real_name, '{}'.format(nick)) + self.add_peers([peer]) def ensure_future(self, coro, callback=None): '''Schedule the coro to be run.''' return self.controller.ensure_future(coro, callback=callback) async def main_loop(self): - '''Not a loop for now...''' - if self.env.irc: - self.ensure_future(self.irc.start(self.irc_name_pairs())) - else: - self.logger.info('IRC is disabled') + '''Main loop performing peer maintenance. This includes - def dns_lookup_peer(self, nick, hostname, details): + 1) Forgetting unreachable peers. + 2) Verifying connectivity of new peers. + 3) Retrying old peers at regular intervals. + ''' + self.connect_to_irc() try: - ip_addr = None - try: - ip_addr = socket.gethostbyname(hostname) - except socket.error: - pass # IPv6? - ip_addr = ip_addr or hostname - self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) - self.logger.info('new IRC peer {} at {} ({})' - .format(nick, hostname, details)) - except UnicodeError: - # UnicodeError comes from invalid domains (issue #68) - self.logger.info('IRC peer domain {} invalid'.format(hostname)) - - def add_irc_peer(self, *args): - '''Schedule DNS lookup of peer.''' - self.controller.schedule_executor(self.dns_lookup_peer, *args) - - def remove_irc_peer(self, nick): - '''Remove a peer from our IRC peers map.''' - self.logger.info('removing IRC peer {}'.format(nick)) - self.irc_peers.pop(nick, None) - - def count(self): - return len(self.irc_peers) + while True: + timeout = self.loop.call_later(WAKEUP_SECS, + self.retry_event.set) + await self.retry_event.wait() + self.retry_event.clear() + timeout.cancel() + await self.retry_peers() + finally: + self.write_peers_file() - def rpc_data(self): - return self.irc_peers + def is_coin_onion_peer(self, peer): + '''Return true if this peer is a hard-coded onion peer.''' + return peer.is_tor and any(peer.host in real_name + for real_name in self.env.coin.PEERS) + + async def retry_peers(self): + '''Retry peers that are close to getting stale.''' + # Exponential backoff of retries + now = time.time() + nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2 + + def retry_peer(peer): + # Try some Tor at startup to determine the proxy so we can + # serve the right banner file + if self.last_tor_retry_time == 0 and self.is_coin_onion_peer(peer): + return True + # Retry a peer whose ports might have updated + if peer.other_port_pairs: + return True + # Retry a good connection if it is about to turn stale + if peer.try_count == 0: + return peer.last_connect < nearly_stale_time + # Retry a failed connection if enough time has passed + return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count + + peers = [peer for peer in self.peers if retry_peer(peer)] + + # If we don't have a tor proxy drop tor peers, but retry + # occasionally + if self.tor_proxy.port is None: + if now < self.last_tor_retry_time + 3600: + peers = [peer for peer in peers if not peer.is_tor] + elif any(peer.is_tor for peer in peers): + self.last_tor_retry_time = now + + for peer in peers: + peer.last_try = time.time() + peer.try_count += 1 + pairs = peer.connection_port_pairs() + if peer.bad or not pairs: + self.maybe_forget_peer(peer) + else: + await self.semaphore.acquire() + self.retry_peer(peer, pairs) + + def retry_peer(self, peer, port_pairs): + kind, port = port_pairs[0] + # Python 3.5.3: use PROTOCOL_TLS + sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23) if kind == 'SSL' else None + + if peer.is_tor: + create_connection = self.tor_proxy.create_connection + else: + create_connection = self.loop.create_connection + + protocol_factory = partial(PeerSession, peer, self, kind) + coro = create_connection(protocol_factory, peer.host, port, ssl=sslc) + callback = partial(self.connection_done, peer, port_pairs) + self.ensure_future(coro, callback) + + def connection_done(self, peer, port_pairs, future): + '''Called when a connection attempt succeeds or fails. + + If failed, log it and try remaining port pairs. If none, + release the connection count semaphore. + ''' + exception = future.exception() + if exception: + kind, port = port_pairs[0] + self.logger.info('failed connecting to {} at {} port {:d}: {}' + .format(peer, kind, port, exception)) + port_pairs = port_pairs[1:] + if port_pairs: + self.retry_peer(peer, port_pairs) + else: + self.set_connection_status(peer, False) + self.semaphore.release() + + def connection_lost(self, session): + '''Called by the peer session when disconnected.''' + self.semaphore.release() + + def set_connection_status(self, peer, good): + '''Called when a connection succeeded or failed.''' + if good: + peer.try_count = 0 + peer.source = 'peer' + # Remove matching IP addresses + for match in peer.matches(self.peers): + if match != peer and peer.host == peer.ip_addr: + self.peers.remove(match) + else: + self.maybe_forget_peer(peer) + + def maybe_forget_peer(self, peer): + '''Forget the peer if appropriate, e.g. long-term unreachable.''' + if peer.bad: + forget = peer.last_connect < time.time() - STALE_SECS // 2 + else: + try_limit = 10 if peer.last_connect else 3 + forget = peer.try_count >= try_limit + + if forget: + desc = 'bad' if peer.bad else 'unreachable' + self.logger.info('forgetting {} peer: {}'.format(desc, peer)) + self.peers.discard(peer) - def on_peers_subscribe(self): - '''Returns the server peers as a list of (ip, host, details) tuples.''' - return list(self.irc_peers.values()) + return forget diff --git a/server/session.py b/server/session.py index 9f16b87..490fc1c 100644 --- a/server/session.py +++ b/server/session.py @@ -7,12 +7,13 @@ '''Classes for local RPC server and remote client TCP/SSL servers.''' +import codecs import time from functools import partial from lib.jsonrpc import JSONSession, RPCError, JSONRPCv2 from server.daemon import DaemonError -from server.version import VERSION +import server.version as version class SessionBase(JSONSession): @@ -32,6 +33,7 @@ class SessionBase(JSONSession): self.env = controller.env self.daemon = self.bp.daemon self.client = 'unknown' + self.protocol_version = '1.0' self.anon_logs = self.env.anon_logs self.last_delay = 0 self.txs_sent = 0 @@ -42,6 +44,7 @@ class SessionBase(JSONSession): self.bw_time = self.start_time self.bw_interval = 3600 self.bw_used = 0 + self.peer_added = False def have_pending_items(self): '''Called each time the pending item queue goes from empty to having @@ -74,6 +77,7 @@ class SessionBase(JSONSession): def connection_lost(self, exc): '''Handle client disconnection.''' + super().connection_lost(exc) msg = '' if self.pause: msg += ' whilst paused' @@ -116,6 +120,8 @@ class ElectrumX(SessionBase): 'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.transaction.broadcast': self.transaction_broadcast, + 'server.add_peer': self.add_peer, + 'server.banner': self.banner, 'server.features': self.server_features, 'server.peers.subscribe': self.peers_subscribe, 'server.version': self.server_version, @@ -171,9 +177,19 @@ class ElectrumX(SessionBase): self.subscribe_height = True return self.height() + def add_peer(self, features): + '''Add a peer.''' + if self.peer_added: + return False + peer_mgr = self.controller.peer_mgr + peer_info = self.peer_info() + source = peer_info[0] if peer_info else 'unknown' + self.peer_added = peer_mgr.on_add_peer(features, source) + return self.peer_added + def peers_subscribe(self): '''Return the server peers as a list of (ip, host, details) tuples.''' - return self.controller.peer_mgr.on_peers_subscribe() + return self.controller.peer_mgr.on_peers_subscribe(self.is_tor()) async def address_subscribe(self, address): '''Subscribe to an address. @@ -190,18 +206,50 @@ class ElectrumX(SessionBase): def server_features(self): '''Returns a dictionary of server features.''' - peer_mgr = self.controller.peer_mgr - hosts = {identity.host: { - 'tcp_port': identity.tcp_port, - 'ssl_port': identity.ssl_port, - } for identity in peer_mgr.identities()} - - return { - 'hosts': hosts, - 'pruning': peer_mgr.pruning, - 'protocol_version': peer_mgr.PROTOCOL_VERSION, - 'server_version': VERSION, - } + return self.controller.peer_mgr.myself.features + + def is_tor(self): + '''Try to detect if the connection is to a tor hidden service we are + running.''' + tor_proxy = self.controller.peer_mgr.tor_proxy + peer_info = self.peer_info() + return peer_info and peer_info[0] == tor_proxy.ip_addr + + async def replaced_banner(self, banner): + network_info = await self.controller.daemon_request('getnetworkinfo') + ni_version = network_info['version'] + major, minor = divmod(ni_version, 1000000) + minor, revision = divmod(minor, 10000) + revision //= 100 + daemon_version = '{:d}.{:d}.{:d}'.format(major, minor, revision) + for pair in [ + ('$VERSION', version.VERSION), + ('$DAEMON_VERSION', daemon_version), + ('$DAEMON_SUBVERSION', network_info['subversion']), + ('$DONATION_ADDRESS', self.env.donation_address), + ]: + banner = banner.replace(*pair) + return banner + + async def banner(self): + '''Return the server banner text.''' + banner = 'Welcome to Electrum!' + + if self.is_tor(): + banner_file = self.env.tor_banner_file + else: + banner_file = self.env.banner_file + if banner_file: + try: + with codecs.open(banner_file, 'r', 'utf-8') as f: + banner = f.read() + except Exception as e: + self.log_error('reading banner file {}: {}' + .format(banner_file, e)) + else: + banner = await self.replaced_banner(banner) + + return banner def server_version(self, client_name=None, protocol_version=None): '''Returns the server version as a string. @@ -213,7 +261,7 @@ class ElectrumX(SessionBase): self.client = str(client_name)[:17] if protocol_version is not None: self.protocol_version = protocol_version - return VERSION + return version.VERSION async def transaction_broadcast(self, raw_tx): '''Broadcast a raw transaction to the network. diff --git a/server/storage.py b/server/storage.py index 074d8a3..6562d33 100644 --- a/server/storage.py +++ b/server/storage.py @@ -12,6 +12,7 @@ from functools import partial import lib.util as util + def db_class(name): '''Returns a DB engine class.''' for db_class in util.subclasses(Storage): diff --git a/server/version.py b/server/version.py index bb177d8..afe380c 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1,5 @@ -VERSION = "ElectrumX 0.10.19" +# Server name and protocol versions + +VERSION = 'ElectrumX 0.11.0' +PROTOCOL_MIN = '1.0' +PROTOCOL_MAX = '1.0' diff --git a/setup.py b/setup.py index 3a91425..b152c25 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setuptools.setup( # "irc" package is only required if IRC connectivity is enabled # via environment variables, in which case I've tested with 15.0.4 # "x11_hash" package (1.4) is required to sync DASH network. - install_requires=['plyvel', 'pylru', 'aiohttp >= 1'], + install_requires=['plyvel', 'pylru', 'irc', 'aiohttp >= 1'], packages=setuptools.find_packages(), description='ElectrumX Server', author='Neil Booth', diff --git a/tests/test_storage.py b/tests/test_storage.py index 910019e..afad2e1 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -44,8 +44,8 @@ def test_batch(db): def test_iterator(db): """ - The iterator should contain all key/value pairs starting with prefix ordered - by key. + The iterator should contain all key/value pairs starting with prefix + ordered by key. """ for i in range(5): db.put(b"abc" + str.encode(str(i)), str.encode(str(i))) diff --git a/tests/test_util.py b/tests/test_util.py index bb98c83..4142679 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -19,7 +19,6 @@ def test_cachedproperty(): cls.CALL_COUNT += 1 return cls.CALL_COUNT - t = Target() assert t.prop == t.prop == 1 assert Target.cls_prop == Target.cls_prop == 1 @@ -56,4 +55,4 @@ def test_chunks(): def test_increment_byte_string(): assert util.increment_byte_string(b'1') == b'2' assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02' - assert util.increment_byte_string(b'\xff\xff') == None + assert util.increment_byte_string(b'\xff\xff') is None