Browse Source

Merge branch 'develop'

master 0.11.0
Neil Booth 8 years ago
parent
commit
782479e91c
  1. 2
      LICENCE
  2. 31
      README.rst
  3. 37
      docs/ENVIRONMENT.rst
  4. 4
      docs/PEER_DISCOVERY.rst
  5. 4
      docs/PROTOCOL.rst
  6. 29
      docs/RPC-INTERFACE.rst
  7. 9
      electrumx_rpc.py
  8. 3
      electrumx_server.py
  9. 47
      lib/coins.py
  10. 22
      lib/hash.py
  11. 141
      lib/jsonrpc.py
  12. 294
      lib/peer.py
  13. 21
      lib/script.py
  14. 181
      lib/socks.py
  15. 26
      lib/tx.py
  16. 47
      lib/util.py
  17. 1
      query.py
  18. 7
      server/block_processor.py
  19. 88
      server/controller.py
  20. 1
      server/daemon.py
  21. 1
      server/db.py
  22. 40
      server/env.py
  23. 12
      server/irc.py
  24. 12
      server/mempool.py
  25. 544
      server/peers.py
  26. 78
      server/session.py
  27. 1
      server/storage.py
  28. 6
      server/version.py
  29. 2
      setup.py
  30. 4
      tests/test_storage.py
  31. 3
      tests/test_util.py

2
LICENCE

@ -1,4 +1,4 @@
Copyright (c) 2016, Neil Booth Copyright (c) 2016-2017, Neil Booth
All rights reserved. All rights reserved.

31
README.rst

@ -8,7 +8,7 @@ ElectrumX - Reimplementation of electrum-server
=============================================== ===============================================
:Licence: MIT :Licence: MIT
:Language: Python (>= 3.5) :Language: Python (>= 3.5.1)
:Author: Neil Booth :Author: Neil Booth
Getting Started Getting Started
@ -42,6 +42,7 @@ Features
- Daemon failover. More than one daemon can be specified, and - Daemon failover. More than one daemon can be specified, and
ElectrumX will failover round-robin style if the current one fails ElectrumX will failover round-robin style if the current one fails
for any reason. for any reason.
- peer discovery protocol removes need for IRC
- Coin abstraction makes compatible altcoin and testnet support easy. - Coin abstraction makes compatible altcoin and testnet support easy.
Motivation Motivation
@ -113,8 +114,8 @@ and associated complications.
Roadmap Pre-1.0 Roadmap Pre-1.0
=============== ===============
- minor code cleanups. - minor code cleanups
- implement simple protocol to discover peers without resorting to IRC. - fixes for a couple of outstanding issues
Roadmap Post-1.0 Roadmap Post-1.0
================ ================
@ -128,18 +129,33 @@ Roadmap Post-1.0
Database Format Database Format
=============== ===============
The database format of ElectrumX is unlikely to change from the 0.10.0 The database format of ElectrumX will not change from the 0.10.0
version prior to the release of 1.0. version for the release of 1.0.
ChangeLog ChangeLog
========= =========
Version 0.11.0
--------------
* implementation of `docs/PEER_DISCOVERY.rst`_ for discovery of server
peers without using IRC. Closes `#104`_. Since all testnet peers
are ElectrumX servers, IRC advertising is now disabled on bitcoin
testnet.
Thanks to bauerj, hsmiths and JWU42 for their help testing these
changes over the last month.
* you can now specify a tor proxy (or have it autodetected if local),
and if an incoming connection seems to be from the proxy a
tor-specific banner file is served. See **TOR_BANNER_FILE** in
`docs/ENVIRONMENT.rst`_.
Version 0.10.19 Version 0.10.19
--------------- ---------------
* update `docs/PEER_DISCOVERY.rst`_ * update `docs/PEER_DISCOVERY.rst`_
* accept IPv6 addresses in DAEMON_URL (fixes #126) * accept IPv6 addresses in DAEMON_URL (fixes `#126`_)
Version 0.10.18 Version 0.10.18
--------------- ---------------
@ -312,7 +328,10 @@ stability please stick with the 0.9 series.
.. _#101: https://github.com/kyuupichan/electrumx/issues/101 .. _#101: https://github.com/kyuupichan/electrumx/issues/101
.. _#102: https://github.com/kyuupichan/electrumx/issues/102 .. _#102: https://github.com/kyuupichan/electrumx/issues/102
.. _#103: https://github.com/kyuupichan/electrumx/issues/103 .. _#103: https://github.com/kyuupichan/electrumx/issues/103
.. _#104: https://github.com/kyuupichan/electrumx/issues/104
.. _#110: https://github.com/kyuupichan/electrumx/issues/110 .. _#110: https://github.com/kyuupichan/electrumx/issues/110
.. _#111: https://github.com/kyuupichan/electrumx/issues/111 .. _#111: https://github.com/kyuupichan/electrumx/issues/111
.. _#126: https://github.com/kyuupichan/electrumx/issues/126
.. _docs/HOWTO.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/HOWTO.rst .. _docs/HOWTO.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/HOWTO.rst
.. _docs/ENVIRONMENT.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/ENVIRONMENT.rst
.. _docs/PEER_DISCOVERY.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/PEER_DISCOVERY.rst .. _docs/PEER_DISCOVERY.rst: https://github.com/kyuupichan/electrumx/blob/master/docs/PEER_DISCOVERY.rst

37
docs/ENVIRONMENT.rst

@ -122,6 +122,11 @@ These environment variables are optional:
+ **$DONATION_ADDRESS** is replaced with the address from the + **$DONATION_ADDRESS** is replaced with the address from the
**DONATION_ADDRESS** environment variable. **DONATION_ADDRESS** environment variable.
* **TOR_BANNER_FILE**
As for **BANNER_FILE** (which is also the default) but shown to
incoming connections believed to be to your Tor hidden service.
* **ANON_LOGS** * **ANON_LOGS**
Set to anything non-empty to replace IP addresses in logs with Set to anything non-empty to replace IP addresses in logs with
@ -207,6 +212,33 @@ raise them.
functioning Electrum clients by default will send pings roughly functioning Electrum clients by default will send pings roughly
every 60 seconds. every 60 seconds.
TOR
---
In response to the `server.peers.subscribe` RPC call, ElectrumX will
only return peer servers that is has recently connected to and
verified basic functionality.
If you are not running a Tor proxy ElectrumX will be unable to connect
to onion server peers, in which case rather than returning no onion
peers it will fall back to a hard-coded list.
To give incoming clients a full range of onion servers you will need
to be running a Tor proxy for ElectrumX to use.
* **TOR_PROXY_HOST**
The host where the Tor proxy is running. Defaults to *127.0.0.1*.
If you use a hostname here rather than an IP address, you must have
Python version >= 3.5.3, Python 3.5.2 will **not** work.
* **TOR_PROXY_PORT**
The port on which the Tor proxy is running. If not set, ElectrumX
will autodetect any proxy running on the usual ports 9050 (Tor),
9150 (Tor browser bundle) and 1080 (socks).
IRC IRC
--- ---
@ -255,6 +287,11 @@ connectivity on IRC:
unless it is '0', otherwise **SSL_PORT**. '0' disables publishing unless it is '0', otherwise **SSL_PORT**. '0' disables publishing
the port. the port.
**NOTE**: Certificate-Authority signed certificates don't work over
Tor, so you should set **REPORT_SSL_PORT_TOR** to 0 if yours is not
self-signed.
Cache Cache
----- -----

4
docs/PEER_DISCOVERY.rst

@ -160,8 +160,8 @@ Unknown keys should be silently ignored.
* **protocol_min** * **protocol_min**
Strings that are the minimum and maximum Electrum protcol versions Strings that are the minimum and maximum Electrum protcol versions
this server speaks. Should be the same as what would suffix the this server speaks. The maximum value should be the same as what
letter **v** in the IRC real name. Example: "1.1". would suffix the letter **v** in the IRC real name. Example: "1.1".
* **pruning** * **pruning**

4
docs/PROTOCOL.rst

@ -723,8 +723,8 @@ Get a list of features and services supported by the server.
* **pruning** * **pruning**
The history pruning limit of the server. If the server does not The history pruning limit of the server as an integer. If the
prune return *null*. server does not prune return *null*.
**Example Response** **Example Response**

29
docs/RPC-INTERFACE.rst

@ -62,10 +62,10 @@ The following commands are available:
Sessions are put into groups, primarily as an anti-DoS measure. Sessions are put into groups, primarily as an anti-DoS measure.
Initially all connections made within a period of time are put in Initially all connections made within a period of time are put in
the same group. High bandwidth usage by a member of a group the same group. High bandwidth usage by a member of a group
deprioritizes itself, and all members of its group to a lesser deprioritizes that session, and all members of its group to a lesser
extent. Low-priority sessions have their requests served after extent. Low-priority sessions have their requests served after
higher priority sessions. ElectrumX will start delaying responses higher priority sessions. ElectrumX will start delaying responses
to a sessions if it becomes sufficiently deprioritized. to a session if it becomes sufficiently deprioritized.
* **sessions** * **sessions**
@ -125,7 +125,7 @@ The following commands are available:
ElectrumX initiates the socket close process for the passed ElectrumX initiates the socket close process for the passed
sessions. Whilst most connections close quickly, it can take sessions. Whilst most connections close quickly, it can take
several minutes for Python to close some SSL connections down. several minutes for Python to shut some SSL connections down.
* **log** * **log**
@ -153,22 +153,25 @@ The following commands are available:
Returns a list of peer electrum servers. This command takes no arguments. Returns a list of peer electrum servers. This command takes no arguments.
Currently this is data gleaned from an IRC session. Currently peer data is obtained via a peer discovery protocol; it
used to be taken from IRC.
* **daemon_url** * **daemon_url**
This command takes an option argument that is interpreted This command takes an optional argument that is interpreted
identically to the **DAEMON_URL** environment variable. If default identically to the **DAEMON_URL** environment variable. If omitted,
value of the argument is the **DAEMON_URL** environment variable. the default argument value is the process's **DAEMON_URL**
environment variable.
The command replaces the daemon's URL at run-time, and rotates to the This command replaces the daemon's URL at run-time, and also
first in the list. forecefully rotates to the first URL in the list.
For example, in case ElectrumX has rotated to a secondary daemon and For example, in case ElectrumX has previously failed over to a
you want to revert to the first after fixing the issue, call this secondary daemon and you want to revert to the primary having
command without an argument. resolved the connectivity issue, invoking this command without an
argument will have that effect.
* **reorg** * **reorg**
Force a block chain reorg. This command takes an optional Force a block chain reorg. This command takes an optional
argument - the number of blocks to reorg - that defaults to 3. argument - the number of blocks to reorg - which defaults to 3.

9
electrumx_rpc.py

@ -40,12 +40,9 @@ class RPCClient(JSONSession):
self.send_request(handler, method, params) self.send_request(handler, method, params)
def handle_response(self, method, result, error): def handle_response(self, method, result, error):
if method in ('groups', 'sessions') and not error: if method in ('groups', 'peers', 'sessions') and not error:
if method == 'groups': lines_func = getattr(Controller, '{}_text_lines'.format(method))
lines = Controller.groups_text_lines(result) for line in lines_func(result):
else:
lines = Controller.sessions_text_lines(result)
for line in lines:
print(line) print(line)
elif error: elif error:
print('error: {} (code {:d})' print('error: {} (code {:d})'

3
electrumx_server.py

@ -25,6 +25,7 @@ SUPPRESS_MESSAGES = [
'Fatal write error on socket transport', 'Fatal write error on socket transport',
] ]
def main_loop(): def main_loop():
'''Start the server.''' '''Start the server.'''
if os.geteuid() == 0: if os.geteuid() == 0:
@ -43,7 +44,7 @@ def main_loop():
def on_exception(loop, context): def on_exception(loop, context):
'''Suppress spurious messages it appears we cannot control.''' '''Suppress spurious messages it appears we cannot control.'''
message = context.get('message') message = context.get('message')
if not message in SUPPRESS_MESSAGES: if message not in SUPPRESS_MESSAGES:
if not ('task' in context and if not ('task' in context and
'accept_connection2()' in repr(context.get('task'))): 'accept_connection2()' in repr(context.get('task'))):
loop.default_exception_handler(context) loop.default_exception_handler(context)

47
lib/coins.py

@ -2,8 +2,26 @@
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # The MIT License (MIT)
# and warranty status of this software. #
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''Module providing coin abstraction. '''Module providing coin abstraction.
@ -37,6 +55,7 @@ class Coin(object):
RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?') RPC_URL_REGEX = re.compile('.+@(\[[0-9a-fA-F:]+\]|[^:]+)(:[0-9]+)?')
VALUE_PER_COIN = 100000000 VALUE_PER_COIN = 100000000
CHUNK_SIZE = 2016 CHUNK_SIZE = 2016
IRC_PREFIX = None
IRC_SERVER = "irc.freenode.net" IRC_SERVER = "irc.freenode.net"
IRC_PORT = 6667 IRC_PORT = 6667
HASHX_LEN = 11 HASHX_LEN = 11
@ -50,10 +69,10 @@ class Coin(object):
Raise an exception if unrecognised.''' Raise an exception if unrecognised.'''
req_attrs = ('TX_COUNT', 'TX_COUNT_HEIGHT', 'TX_PER_BLOCK', req_attrs = ('TX_COUNT', 'TX_COUNT_HEIGHT', 'TX_PER_BLOCK',
'IRC_CHANNEL', 'IRC_PREFIX') 'IRC_CHANNEL')
for coin in util.subclasses(Coin): for coin in util.subclasses(Coin):
if (coin.NAME.lower() == name.lower() if (coin.NAME.lower() == name.lower() and
and coin.NET.lower() == net.lower()): coin.NET.lower() == net.lower()):
missing = [attr for attr in req_attrs missing = [attr for attr in req_attrs
if not hasattr(coin, attr)] if not hasattr(coin, attr)]
if missing: if missing:
@ -285,25 +304,28 @@ class Bitcoin(Coin):
IRC_CHANNEL = "#electrum" IRC_CHANNEL = "#electrum"
RPC_PORT = 8332 RPC_PORT = 8332
PEERS = [ PEERS = [
'4cii7ryno5j3axe4.onion t'
'btc.smsys.me s995', 'btc.smsys.me s995',
'ca6ulp2j2mpsft3y.onion s t', 'ca6ulp2j2mpsft3y.onion s t',
'electrum.be s t', 'electrum.be s t',
'electrum.trouth.net s t', 'electrum.trouth.net p10000 s t',
'electrum.vom-stausee.de s t', 'electrum.vom-stausee.de s t',
'electrum3.hachre.de s t', 'electrum3.hachre.de p10000 s t',
'electrum.hsmiths.com s t', 'electrum.hsmiths.com s t',
'erbium1.sytes.net s t', 'erbium1.sytes.net s t',
'h.1209k.com s t', 'fdkbwjykvl2f3hup.onion p10000 s t',
'h.1209k.com p10000 s t',
'helicarrier.bauerj.eu s t', 'helicarrier.bauerj.eu s t',
'hsmiths4fyqlw5xw.onion s t',
'ozahtqwp25chjdjd.onion s t', 'ozahtqwp25chjdjd.onion s t',
'us11.einfachmalnettsein.de s t', 'us11.einfachmalnettsein.de s t',
'ELEX01.blackpole.online s t',
] ]
class BitcoinTestnet(Bitcoin): class BitcoinTestnet(Bitcoin):
SHORTNAME = "XTN" SHORTNAME = "XTN"
NET = "testnet" NET = "testnet"
IRC_PREFIX = None
XPUB_VERBYTES = bytes.fromhex("043587cf") XPUB_VERBYTES = bytes.fromhex("043587cf")
XPRV_VERBYTES = bytes.fromhex("04358394") XPRV_VERBYTES = bytes.fromhex("04358394")
P2PKH_VERBYTE = 0x6f P2PKH_VERBYTE = 0x6f
@ -315,16 +337,15 @@ class BitcoinTestnet(Bitcoin):
TX_COUNT = 12242438 TX_COUNT = 12242438
TX_COUNT_HEIGHT = 1035428 TX_COUNT_HEIGHT = 1035428
TX_PER_BLOCK = 21 TX_PER_BLOCK = 21
IRC_PREFIX = "ET_"
RPC_PORT = 18332 RPC_PORT = 18332
PEER_DEFAULT_PORTS = {'t': '51001', 's': '51002'} PEER_DEFAULT_PORTS = {'t': '51001', 's': '51002'}
PEERS = [ PEERS = [
'electrum.akinbo.org s t', 'electrum.akinbo.org s t',
'he36kyperp3kbuxu.onion s t', 'he36kyperp3kbuxu.onion s t',
'electrum-btc-testnet.petrkr.net s t', 'electrum-btc-testnet.petrkr.net s t',
'testnet.hsmiths.com t53011 s53012', 'testnet.hsmiths.com t53011',
'hsmithsxurybd7uh.onion t53011', 'hsmithsxurybd7uh.onion t53011 s53012',
'testnet.not.fyi s t', 'ELEX05.blackpole.online t52001 s52002',
] ]

22
lib/hash.py

@ -2,8 +2,26 @@
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # The MIT License (MIT)
# and warranty status of this software. #
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''Cryptograph hash functions and related classes.''' '''Cryptograph hash functions and related classes.'''

141
lib/jsonrpc.py

@ -2,8 +2,26 @@
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # The MIT License (MIT)
# and warranty status of this software. #
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''Classes for acting as a peer over a transport and speaking the JSON '''Classes for acting as a peer over a transport and speaking the JSON
RPC versions 1.0 and 2.0. RPC versions 1.0 and 2.0.
@ -41,9 +59,35 @@ class JSONRPC(object):
INVALID_ARGS = -32602 INVALID_ARGS = -32602
INTERNAL_ERROR = -32603 INTERNAL_ERROR = -32603
# Codes for this library
INVALID_RESPONSE = -100
ERROR_CODE_UNAVAILABLE = -101
REQUEST_TIMEOUT = -102
ID_TYPES = (type(None), str, numbers.Number) ID_TYPES = (type(None), str, numbers.Number)
HAS_BATCHES = False HAS_BATCHES = False
@classmethod
def canonical_error(cls, error):
'''Convert an error to a JSON RPC 2.0 error.
Handlers then only have a single form of error to deal with.
'''
if isinstance(error, int):
error = {'code': error}
elif isinstance(error, str):
error = {'message': error}
elif not isinstance(error, dict):
error = {'data': error}
error['code'] = error.get('code', JSONRPC.ERROR_CODE_UNAVAILABLE)
error['message'] = error.get('message', 'error message unavailable')
return error
@classmethod
def timeout_error(cls):
return {'message': 'request timed out',
'code': JSONRPC.REQUEST_TIMEOUT}
class JSONRPCv1(JSONRPC): class JSONRPCv1(JSONRPC):
'''JSON RPC version 1.0.''' '''JSON RPC version 1.0.'''
@ -71,23 +115,23 @@ class JSONRPCv1(JSONRPC):
@classmethod @classmethod
def handle_response(cls, handler, payload): def handle_response(cls, handler, payload):
'''JSON v1 response handler. Both 'error' and 'response' '''JSON v1 response handler. Both 'error' and 'result'
should exist with exactly one being None. should exist with exactly one being None.
Unfortunately many 1.0 clients behave like 2.0, and just send
one or the other.
''' '''
try: error = payload.get('error')
result = payload['result'] if error is None:
error = payload['error'] handler(payload.get('result'), None)
except KeyError:
pass
else: else:
if (result is None) != (error is None): handler(None, cls.canonical_error(error))
handler(result, error)
@classmethod @classmethod
def is_request(cls, payload): def is_request(cls, payload):
'''Returns True if the payload (which has a method) is a request. '''Returns True if the payload (which has a method) is a request.
False means it is a notification.''' False means it is a notification.'''
return payload.get('id') != None return payload.get('id') is not None
class JSONRPCv2(JSONRPC): class JSONRPCv2(JSONRPC):
@ -124,17 +168,17 @@ class JSONRPCv2(JSONRPC):
@classmethod @classmethod
def handle_response(cls, handler, payload): def handle_response(cls, handler, payload):
'''JSON v2 response handler. Exactly one of 'error' and 'response' '''JSON v2 response handler. Exactly one of 'error' and 'result'
must exist. Errors must have 'code' and 'message' members. must exist. Errors must have 'code' and 'message' members.
''' '''
if ('error' in payload) != ('result' in payload): if 'error' in payload:
if 'result' in payload: handler(None, cls.canonical_error(payload['error']))
elif 'result' in payload:
handler(payload['result'], None) handler(payload['result'], None)
else: else:
error = payload['error'] error = {'message': 'no error or result returned',
if (isinstance(error, dict) and 'code' in error 'code': JSONRPC.INVALID_RESPONSE}
and 'message' in error): handler(None, cls.canonical_error(error))
handler(None, error)
@classmethod @classmethod
def batch_size(cls, parts): def batch_size(cls, parts):
@ -213,15 +257,49 @@ class JSONSessionBase(util.LoggedClass):
responses. Incoming messages are queued. When the queue goes responses. Incoming messages are queued. When the queue goes
from empty from empty
''' '''
_next_session_id = 0
NEXT_SESSION_ID = 0 _pending_reqs = {}
@classmethod @classmethod
def next_session_id(cls): def next_session_id(cls):
session_id = cls.NEXT_SESSION_ID '''Return the next unique session ID.'''
cls.NEXT_SESSION_ID += 1 session_id = cls._next_session_id
cls._next_session_id += 1
return session_id return session_id
def _pending_request_keys(self):
'''Return a generator of pending request keys for this session.'''
return [key for key in self._pending_reqs if key[0] is self]
def has_pending_requests(self):
'''Return True if this session has pending requests.'''
return bool(self._pending_request_keys())
def pop_response_handler(self, msg_id):
'''Return the response handler for the given message ID.'''
return self._pending_reqs.pop((self, msg_id), (None, None))[0]
def timeout_session(self):
'''Trigger timeouts for all of the session's pending requests.'''
self._timeout_requests(self._pending_request_keys())
@classmethod
def timeout_check(cls):
'''Trigger timeouts where necessary for all pending requests.'''
now = time.time()
keys = [key for key, value in cls._pending_reqs.items()
if value[1] < now]
cls._timeout_requests(keys)
@classmethod
def _timeout_requests(cls, keys):
'''Trigger timeouts for the given lookup keys.'''
values = [cls._pending_reqs.pop(key) for key in keys]
handlers = [handler for handler, timeout in values]
timeout_error = JSONRPC.timeout_error()
for handler in handlers:
handler(None, timeout_error)
def __init__(self, version=JSONRPCCompat): def __init__(self, version=JSONRPCCompat):
super().__init__() super().__init__()
@ -245,7 +323,6 @@ class JSONSessionBase(util.LoggedClass):
self.batch_results = [] self.batch_results = []
# Handling of outgoing requests # Handling of outgoing requests
self.next_request_id = 0 self.next_request_id = 0
self.pending_responses = {}
# If buffered incoming data exceeds this the connection is closed # If buffered incoming data exceeds this the connection is closed
self.max_buffer_size = 1000000 self.max_buffer_size = 1000000
self.max_send = 50000 self.max_send = 50000
@ -296,7 +373,7 @@ class JSONSessionBase(util.LoggedClass):
'''Extract and return the ID from the payload. '''Extract and return the ID from the payload.
Raises an RPCError if it is missing or invalid.''' Raises an RPCError if it is missing or invalid.'''
if not 'id' in payload: if 'id' not in payload:
raise RPCError('missing id', JSONRPC.INVALID_REQUEST) raise RPCError('missing id', JSONRPC.INVALID_REQUEST)
id_ = payload['id'] id_ = payload['id']
@ -473,7 +550,7 @@ class JSONSessionBase(util.LoggedClass):
'''Handle a single JSON response.''' '''Handle a single JSON response.'''
try: try:
id_ = self.check_payload_id(payload) id_ = self.check_payload_id(payload)
handler = self.pending_responses.pop(id_, None) handler = self.pop_response_handler(id_)
if handler: if handler:
self.version.handle_response(handler, payload) self.version.handle_response(handler, payload)
else: else:
@ -593,14 +670,18 @@ class JSONSessionBase(util.LoggedClass):
'''Send a JSON error.''' '''Send a JSON error.'''
self.send_binary(self.error_bytes(message, code, id_)) self.send_binary(self.error_bytes(message, code, id_))
def send_request(self, handler, method, params=None): def send_request(self, handler, method, params=None, timeout=30):
'''Sends a request and arranges for handler to be called with the '''Sends a request and arranges for handler to be called with the
response when it comes in. response when it comes in.
A call to request_timeout_check() will result in pending
responses that have been waiting more than timeout seconds to
call the handler with a REQUEST_TIMEOUT error.
''' '''
id_ = self.next_request_id id_ = self.next_request_id
self.next_request_id += 1 self.next_request_id += 1
self.send_binary(self.request_bytes(id_, method, params)) self.send_binary(self.request_bytes(id_, method, params))
self.pending_responses[id_] = handler self._pending_reqs[(self, id_)] = (handler, time.time() + timeout)
def send_notification(self, method, params=None): def send_notification(self, method, params=None):
'''Send a notification.''' '''Send a notification.'''
@ -679,7 +760,9 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
def peer_info(self): def peer_info(self):
'''Returns information about the peer.''' '''Returns information about the peer.'''
if self.transport:
return self.transport.get_extra_info('peername') return self.transport.get_extra_info('peername')
return None
def abort(self): def abort(self):
'''Cut the connection abruptly.''' '''Cut the connection abruptly.'''
@ -691,6 +774,10 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
self.transport = transport self.transport = transport
super().connection_made() super().connection_made()
def connection_lost(self, exc):
'''Trigger timeouts of all pending requests.'''
self.timeout_session()
def is_closing(self): def is_closing(self):
'''True if the underlying transport is closing.''' '''True if the underlying transport is closing.'''
return self.transport and self.transport.is_closing() return self.transport and self.transport.is_closing()

294
lib/peer.py

@ -0,0 +1,294 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''Representation of a peer server.'''
import re
from ipaddress import ip_address
from lib.util import cachedproperty
class Peer(object):
# Protocol version
VERSION_REGEX = re.compile('[0-9]+(\.[0-9]+)?$')
ATTRS = ('host', 'features',
# metadata
'source', 'ip_addr', 'good_ports',
'last_connect', 'last_try', 'try_count')
PORTS = ('ssl_port', 'tcp_port')
FEATURES = PORTS + ('pruning', 'server_version',
'protocol_min', 'protocol_max')
# This should be set by the application
DEFAULT_PORTS = {}
def __init__(self, host, features, source='unknown', ip_addr=None,
good_ports=[], last_connect=0, last_try=0, try_count=0):
'''Create a peer given a host name (or IP address as a string),
a dictionary of features, and a record of the source.'''
assert isinstance(host, str)
assert isinstance(features, dict)
self.host = host
self.features = features.copy()
# Canonicalize / clean-up
for feature in self.FEATURES:
self.features[feature] = getattr(self, feature)
# Metadata
self.source = source
self.ip_addr = ip_addr
self.good_ports = good_ports.copy()
self.last_connect = last_connect
self.last_try = last_try
self.try_count = try_count
# Transient, non-persisted metadata
self.bad = False
self.other_port_pairs = set()
@classmethod
def peers_from_features(cls, features, source):
peers = []
if isinstance(features, dict):
hosts = features.get('hosts')
if isinstance(hosts, dict):
peers = [Peer(host, features, source=source)
for host in hosts if isinstance(host, str)]
return peers
@classmethod
def deserialize(cls, item):
'''Deserialize from a dictionary.'''
return cls(**item)
@classmethod
def version_tuple(cls, vstr):
'''Convert a version string, such as "1.2", to a (major_version,
minor_version) pair.
'''
if isinstance(vstr, str) and VERSION_REGEX.match(vstr):
if '.' not in vstr:
vstr += '.0'
else:
vstr = '1.0'
return tuple(int(part) for part in vstr.split('.'))
def matches(self, peers):
'''Return peers whose host matches the given peer's host or IP
address. This results in our favouring host names over IP
addresses.
'''
candidates = (self.host.lower(), self.ip_addr)
return [peer for peer in peers if peer.host.lower() in candidates]
def __str__(self):
return self.host
def update_features(self, features):
'''Update features in-place.'''
tmp = Peer(self.host, features)
self.features = tmp.features
for feature in self.FEATURES:
setattr(self, feature, getattr(tmp, feature))
def connection_port_pairs(self):
'''Return a list of (kind, port) pairs to try when making a
connection.'''
# Use a list not a set - it's important to try the registered
# ports first.
pairs = [('SSL', self.ssl_port), ('TCP', self.tcp_port)]
while self.other_port_pairs:
pairs.append(other_port_pairs.pop())
return [pair for pair in pairs if pair[1]]
def mark_bad(self):
'''Mark as bad to avoid reconnects but also to remember for a
while.'''
self.bad = True
def check_ports(self, other):
'''Remember differing ports in case server operator changed them
or removed one.'''
if other.ssl_port != self.ssl_port:
self.other_port_pairs.add(('SSL', other.ssl_port))
if other.tcp_port != self.tcp_port:
self.other_port_pairs.add(('TCP', other.tcp_port))
return bool(self.other_port_pairs)
@cachedproperty
def is_tor(self):
return self.host.endswith('.onion')
@cachedproperty
def is_valid(self):
ip = self.ip_address
if not ip:
return True
return not ip.is_multicast and (ip.is_global or ip.is_private)
@cachedproperty
def is_public(self):
ip = self.ip_address
return self.is_valid and not (ip and ip.is_private)
@cachedproperty
def ip_address(self):
'''The host as a python ip_address object, or None.'''
try:
return ip_address(self.host)
except ValueError:
return None
def bucket(self):
if self.is_tor:
return 'onion'
if not self.ip_addr:
return ''
return tuple(self.ip_addr.split('.')[:2])
def serialize(self):
'''Serialize to a dictionary.'''
return {attr: getattr(self, attr) for attr in self.ATTRS}
def _port(self, key):
hosts = self.features.get('hosts')
if isinstance(hosts, dict):
host = hosts.get(self.host)
port = self._integer(key, host)
if port and 0 < port < 65536:
return port
return None
def _integer(self, key, d=None):
d = d or self.features
result = d.get(key) if isinstance(d, dict) else None
if isinstance(result, str):
try:
result = int(result)
except ValueError:
pass
return result if isinstance(result, int) else None
def _string(self, key):
result = self.features.get(key)
return result if isinstance(result, str) else None
def _version_string(self, key):
version = self.features.get(key)
return '{:d}.{:d}'.format(*self.version_tuple(version))
@cachedproperty
def genesis_hash(self):
'''Returns None if no SSL port, otherwise the port as an integer.'''
return self._string('genesis_hash')
@cachedproperty
def ssl_port(self):
'''Returns None if no SSL port, otherwise the port as an integer.'''
return self._port('ssl_port')
@cachedproperty
def tcp_port(self):
'''Returns None if no TCP port, otherwise the port as an integer.'''
return self._port('tcp_port')
@cachedproperty
def server_version(self):
'''Returns the server version as a string if known, otherwise None.'''
return self._string('server_version')
@cachedproperty
def pruning(self):
'''Returns the pruning level as an integer. None indicates no
pruning.'''
pruning = self._integer('pruning')
if pruning and pruning > 0:
return pruning
return None
@cachedproperty
def protocol_min(self):
'''Minimum protocol version as a string, e.g., 1.0'''
return self._version_string('protcol_min')
@cachedproperty
def protocol_max(self):
'''Maximum protocol version as a string, e.g., 1.1'''
return self._version_string('protcol_max')
def to_tuple(self):
'''The tuple ((ip, host, details) expected in response
to a peers subscription.'''
details = self.real_name().split()[1:]
return (self.ip_addr or self.host, self.host, details)
def real_name(self, host_override=None):
'''Real name of this peer as used on IRC.'''
def port_text(letter, port):
if port == self.DEFAULT_PORTS.get(letter):
return letter
else:
return letter + str(port)
parts = [host_override or self.host, 'v' + self.protocol_max]
if self.pruning:
parts.append('p{:d}'.format(self.pruning))
for letter, port in (('s', self.ssl_port), ('t', self.tcp_port)):
if port:
parts.append(port_text(letter, port))
return ' '.join(parts)
@classmethod
def from_real_name(cls, real_name, source):
'''Real name is a real name as on IRC, such as
"erbium1.sytes.net v1.0 s t"
Returns an instance of this Peer class.
'''
host = 'nohost'
features = {}
ports = {}
for n, part in enumerate(real_name.split()):
if n == 0:
host = part
continue
if part[0] in ('s', 't'):
if len(part) == 1:
port = cls.DEFAULT_PORTS[part[0]]
else:
port = part[1:]
if part[0] == 's':
ports['ssl_port'] = port
else:
ports['tcp_port'] = port
elif part[0] == 'v':
features['protocol_max'] = features['protocol_min'] = part[1:]
elif part[0] == 'p':
features['pruning'] = part[1:]
features.update(ports)
features['hosts'] = {host: ports}
return cls(host, features, source)

21
lib/script.py

@ -2,7 +2,26 @@
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# and warranty status of this software. # and warranty status of this software.
'''Script-related classes and functions.''' '''Script-related classes and functions.'''

181
lib/socks.py

@ -0,0 +1,181 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# and warranty status of this software.
'''Socks proxying.'''
import asyncio
import ipaddress
import logging
import socket
import struct
from functools import partial
import lib.util as util
class Socks(util.LoggedClass):
'''Socks protocol wrapper.'''
SOCKS5_ERRORS = {
1: 'general SOCKS server failure',
2: 'connection not allowed by ruleset',
3: 'network unreachable',
4: 'host unreachable',
5: 'connection refused',
6: 'TTL expired',
7: 'command not supported',
8: 'address type not supported',
}
class Error(Exception):
pass
def __init__(self, loop, sock, host, port):
super().__init__()
self.loop = loop
self.sock = sock
self.host = host
self.port = port
try:
self.ip_address = ipaddress.ip_address(host)
except ValueError:
self.ip_address = None
self.debug = False
async def _socks4_handshake(self):
if self.ip_address:
# Socks 4
ip_addr = self.ip_address
host_bytes = b''
else:
# Socks 4a
ip_addr = ipaddress.ip_address('0.0.0.1')
host_bytes = self.host.encode() + b'\0'
user_id = ''
data = b'\4\1' + struct.pack('>H', self.port) + ip_addr.packed
data += user_id.encode() + b'\0' + host_bytes
await self.loop.sock_sendall(self.sock, data)
data = await self.loop.sock_recv(self.sock, 8)
if data[0] != 0:
raise self.Error('proxy sent bad initial Socks4 byte')
if data[1] != 0x5a:
raise self.Error('proxy request failed or rejected')
async def _socks5_handshake(self):
await self.loop.sock_sendall(self.sock, b'\5\1\0')
data = await self.loop.sock_recv(self.sock, 2)
if data[0] != 5:
raise self.Error('proxy sent bad SOCKS5 initial byte')
if data[1] != 0:
raise self.Error('proxy rejected SOCKS5 authentication method')
if self.ip_address:
if self.ip_address.version == 4:
addr = b'\1' + self.ip_address.packed
else:
addr = b'\4' + self.ip_address.packed
else:
host = self.host.encode()
addr = b'\3' + bytes([len(host)]) + host
data = b'\5\1\0' + addr + struct.pack('>H', self.port)
await self.loop.sock_sendall(self.sock, data)
data = await self.loop.sock_recv(self.sock, 5)
if data[0] != 5:
raise self.Error('proxy sent bad SOSCK5 response initial byte')
if data[1] != 0:
msg = self.SOCKS5_ERRORS.get(data[1], 'unknown SOCKS5 error {:d}'
.format(data[1]))
raise self.Error(msg)
if data[3] == 1:
addr_len, data = 3, data[4:]
elif data[3] == 3:
addr_len, data = data[4], b''
elif data[3] == 4:
addr_len, data = 15, data[4:]
data = await self.loop.sock_recv(self.sock, addr_len + 2)
addr = data[:addr_len]
port, = struct.unpack('>H', data[-2:])
async def handshake(self):
'''Write the proxy handshake sequence.'''
if self.ip_address and self.ip_address.version == 6:
await self._socks5_handshake()
else:
await self._socks4_handshake()
if self.debug:
address = (self.host, self.port)
self.log_info('successful connection via proxy to {}'
.format(util.address_string(address)))
class SocksProxy(util.LoggedClass):
def __init__(self, host, port, loop=None):
'''Host can be an IPv4 address, IPv6 address, or a host name.'''
super().__init__()
self.host = host
self.port = port
self.ip_addr = None
self.loop = loop or asyncio.get_event_loop()
async def create_connection(self, protocol_factory, host, port, ssl=None):
'''All arguments are as to asyncio's create_connection method.'''
if self.port is None:
proxy_ports = [9050, 9150, 1080]
else:
proxy_ports = [self.port]
for proxy_port in proxy_ports:
address = (self.host, proxy_port)
sock = socket.socket()
sock.setblocking(False)
try:
await self.loop.sock_connect(sock, address)
except OSError as e:
if proxy_port == proxy_ports[-1]:
raise
continue
socks = Socks(self.loop, sock, host, port)
try:
await socks.handshake()
if self.port is None:
self.ip_addr = sock.getpeername()[0]
self.port = proxy_port
self.logger.info('detected proxy at {} ({})'
.format(util.address_string(address),
self.ip_addr))
break
except Exception as e:
sock.close()
raise
hostname = host if ssl else None
return await self.loop.create_connection(
protocol_factory, ssl=ssl, sock=sock, server_hostname=hostname)

26
lib/tx.py

@ -2,7 +2,26 @@
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# and warranty status of this software. # and warranty status of this software.
'''Transaction-related classes and functions.''' '''Transaction-related classes and functions.'''
@ -24,6 +43,7 @@ class Tx(namedtuple("Tx", "version inputs outputs locktime")):
# FIXME: add hash as a cached property? # FIXME: add hash as a cached property?
class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")): class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")):
'''Class representing a transaction input.''' '''Class representing a transaction input.'''
@ -32,8 +52,8 @@ class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")):
@cachedproperty @cachedproperty
def is_coinbase(self): def is_coinbase(self):
return (self.prev_hash == TxInput.ZERO return (self.prev_hash == TxInput.ZERO and
and self.prev_idx == TxInput.MINUS_1) self.prev_idx == TxInput.MINUS_1)
@cachedproperty @cachedproperty
def script_sig_info(self): def script_sig_info(self):

47
lib/util.py

@ -2,15 +2,34 @@
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# and warranty status of this software. # and warranty status of this software.
'''Miscellaneous utility classes and functions.''' '''Miscellaneous utility classes and functions.'''
import array import array
import asyncio
import inspect import inspect
from ipaddress import ip_address
import logging import logging
import sys import sys
from collections import Container, Mapping from collections import Container, Mapping
@ -109,11 +128,12 @@ def deep_getsizeof(obj):
return size(obj) return size(obj)
def subclasses(base_class, strict=True): def subclasses(base_class, strict=True):
'''Return a list of subclasses of base_class in its module.''' '''Return a list of subclasses of base_class in its module.'''
def select(obj): def select(obj):
return (inspect.isclass(obj) and issubclass(obj, base_class) return (inspect.isclass(obj) and issubclass(obj, base_class) and
and (not strict or obj != base_class)) (not strict or obj != base_class))
pairs = inspect.getmembers(sys.modules[base_class.__module__], select) pairs = inspect.getmembers(sys.modules[base_class.__module__], select)
return [pair[1] for pair in pairs] return [pair[1] for pair in pairs]
@ -202,3 +222,22 @@ def open_file(filename, create=False):
if create: if create:
return open(filename, 'wb+') return open(filename, 'wb+')
raise raise
def open_truncate(filename):
'''Open the file name. Return its handle.'''
return open(filename, 'wb+')
def address_string(address):
'''Return an address as a correctly formatted string.'''
fmt = '{}:{:d}'
host, port = address
try:
host = ip_address(host)
except ValueError:
pass
else:
if host.version == 6:
fmt = '[{}]:{:d}'
return fmt.format(host, port)

1
query.py

@ -71,5 +71,6 @@ def main():
print('Balance: {} {}'.format(coin.decimal_value(balance), print('Balance: {} {}'.format(coin.decimal_value(balance),
coin.SHORTNAME)) coin.SHORTNAME))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

7
server/block_processor.py

@ -87,10 +87,11 @@ class Prefetcher(LoggedClass):
with await self.semaphore: with await self.semaphore:
while self.cache_size < self.min_cache_size: while self.cache_size < self.min_cache_size:
# Try and catch up all blocks but limit to room in cache. # Try and catch up all blocks but limit to room in cache.
# Constrain fetch count to between 0 and 2500 regardless. # Constrain fetch count to between 0 and 500 regardless;
# testnet can be lumpy.
cache_room = self.min_cache_size // self.ave_size cache_room = self.min_cache_size // self.ave_size
count = min(daemon_height - self.fetched_height, cache_room) count = min(daemon_height - self.fetched_height, cache_room)
count = min(2500, max(count, 0)) count = min(500, max(count, 0))
if not count: if not count:
if not self.caught_up: if not self.caught_up:
self.caught_up = True self.caught_up = True
@ -568,7 +569,7 @@ class BlockProcessor(server.db.DB):
header = coin.block_header(block, self.height) header = coin.block_header(block, self.height)
header_hash = coin.header_hash(header) header_hash = coin.header_hash(header)
if header_hash != self.tip: if header_hash != self.tip:
raise ChainError('backup block {} is not tip {} at height {:,d}' raise ChainError('backup block {} not tip {} at height {:,d}'
.format(hash_to_str(header_hash), .format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height)) hash_to_str(self.tip), self.height))
self.tip = coin.header_prevhash(header) self.tip = coin.header_prevhash(header)

88
server/controller.py

@ -6,7 +6,6 @@
# and warranty status of this software. # and warranty status of this software.
import asyncio import asyncio
import codecs
import json import json
import os import os
import ssl import ssl
@ -20,7 +19,7 @@ from functools import partial
import pylru import pylru
from lib.jsonrpc import JSONRPC, RPCError from lib.jsonrpc import JSONRPC, JSONSessionBase, RPCError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
import lib.util as util import lib.util as util
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
@ -28,7 +27,6 @@ from server.daemon import Daemon, DaemonError
from server.mempool import MemPool from server.mempool import MemPool
from server.peers import PeerManager from server.peers import PeerManager
from server.session import LocalRPC, ElectrumX from server.session import LocalRPC, ElectrumX
from server.version import VERSION
class Controller(util.LoggedClass): class Controller(util.LoggedClass):
@ -88,8 +86,7 @@ class Controller(util.LoggedClass):
'address.get_proof address.listunspent ' 'address.get_proof address.listunspent '
'block.get_header block.get_chunk estimatefee relayfee ' 'block.get_header block.get_chunk estimatefee relayfee '
'transaction.get transaction.get_merkle utxo.get_address'), 'transaction.get transaction.get_merkle utxo.get_address'),
('server', ('server', 'donation_address'),
'banner donation_address'),
] ]
self.electrumx_handlers = {'.'.join([prefix, suffix]): self.electrumx_handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_')) getattr(self, suffix.replace('.', '_'))
@ -170,7 +167,7 @@ class Controller(util.LoggedClass):
def enqueue_session(self, session): def enqueue_session(self, session):
# Might have disconnected whilst waiting # Might have disconnected whilst waiting
if not session in self.sessions: if session not in self.sessions:
return return
priority = self.session_priority(session) priority = self.session_priority(session)
item = (priority, self.next_queue_id, session) item = (priority, self.next_queue_id, session)
@ -218,22 +215,29 @@ class Controller(util.LoggedClass):
def on_future_done(self, future): def on_future_done(self, future):
'''Collect the result of a future after removing it from our set.''' '''Collect the result of a future after removing it from our set.'''
callback = self.futures.pop(future) callback = self.futures.pop(future)
try:
if callback: if callback:
callback(future) callback(future)
else: else:
try:
future.result() future.result()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except Exception: except Exception:
self.log_error(traceback.format_exc()) self.log_error(traceback.format_exc())
async def check_request_timeouts(self):
'''Regularly check pending JSON requests for timeouts.'''
while True:
await asyncio.sleep(30)
JSONSessionBase.timeout_check()
async def wait_for_bp_catchup(self): async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.''' '''Called when the block processor catches up.'''
await self.bp.caught_up_event.wait() await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up') self.logger.info('block processor has caught up')
self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers()) self.ensure_future(self.start_servers())
self.ensure_future(self.check_request_timeouts())
self.ensure_future(self.mempool.main_loop()) self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions()) self.ensure_future(self.enqueue_delayed_sessions())
self.ensure_future(self.notify()) self.ensure_future(self.notify())
@ -242,6 +246,8 @@ class Controller(util.LoggedClass):
async def main_loop(self): async def main_loop(self):
'''Controller main loop.''' '''Controller main loop.'''
if self.env.rpc_port is not None:
await self.start_server('RPC', 'localhost', self.env.rpc_port)
self.ensure_future(self.bp.main_loop()) self.ensure_future(self.bp.main_loop())
self.ensure_future(self.wait_for_bp_catchup()) self.ensure_future(self.wait_for_bp_catchup())
@ -269,6 +275,9 @@ class Controller(util.LoggedClass):
for session in self.sessions: for session in self.sessions:
self.close_session(session) self.close_session(session)
# This might resolve "future never awaited" log
await asyncio.sleep(0)
# Cancel pending futures # Cancel pending futures
for future in self.futures: for future in self.futures:
future.cancel() future.cancel()
@ -306,9 +315,7 @@ class Controller(util.LoggedClass):
.format(kind, host, port)) .format(kind, host, port))
async def start_servers(self): async def start_servers(self):
'''Start RPC, TCP and SSL servers once caught up.''' '''Start TCP and SSL servers.'''
if self.env.rpc_port is not None:
await self.start_server('RPC', 'localhost', self.env.rpc_port)
self.logger.info('max session count: {:,d}'.format(self.max_sessions)) self.logger.info('max session count: {:,d}'.format(self.max_sessions))
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
.format(self.env.session_timeout)) .format(self.env.session_timeout))
@ -459,7 +466,7 @@ class Controller(util.LoggedClass):
'logged': len([s for s in self.sessions if s.log_me]), 'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.pause for s in self.sessions), 'paused': sum(s.pause for s in self.sessions),
'pid': os.getpid(), 'pid': os.getpid(),
'peers': self.peer_mgr.count(), 'peers': self.peer_mgr.info(),
'requests': sum(s.count_pending_items() for s in self.sessions), 'requests': sum(s.count_pending_items() for s in self.sessions),
'sessions': self.session_count(), 'sessions': self.session_count(),
'subs': self.sub_count(), 'subs': self.sub_count(),
@ -511,6 +518,38 @@ class Controller(util.LoggedClass):
]) ])
return result return result
@staticmethod
def peers_text_lines(data):
'''A generator returning lines for a list of peers.
data is the return value of rpc_peers().'''
def time_fmt(t):
if not t:
return 'Never'
return util.formatted_time(now - t)
now = time.time()
fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>3} '
'{:>3} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}')
yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min',
'Max', 'Pruning', 'Last Conn', 'Last Try',
'Tries', 'Source', 'IP Address')
for item in data:
features = item['features']
yield fmt.format(item['host'][:30],
item['status'],
features['tcp_port'] or '',
features['ssl_port'] or '',
features['server_version'] or 'unknown',
features['protocol_min'],
features['protocol_max'],
features['pruning'] or '',
time_fmt(item['last_connect']),
time_fmt(item['last_try']),
item['try_count'],
item['source'][:20],
item['ip_addr'] or '')
@staticmethod @staticmethod
def sessions_text_lines(data): def sessions_text_lines(data):
'''A generator returning lines for a list of sessions. '''A generator returning lines for a list of sessions.
@ -872,33 +911,6 @@ class Controller(util.LoggedClass):
# Client RPC "server" command handlers # Client RPC "server" command handlers
async def banner(self):
'''Return the server banner text.'''
banner = 'Welcome to Electrum!'
if self.env.banner_file:
try:
with codecs.open(self.env.banner_file, 'r', 'utf-8') as f:
banner = f.read()
except Exception as e:
self.log_error('reading banner file {}: {}'
.format(self.env.banner_file, e))
else:
network_info = await self.daemon_request('getnetworkinfo')
version = network_info['version']
major, minor = divmod(version, 1000000)
minor, revision = divmod(minor, 10000)
revision //= 100
version = '{:d}.{:d}.{:d}'.format(major, minor, revision)
for pair in [
('$VERSION', VERSION),
('$DAEMON_VERSION', version),
('$DAEMON_SUBVERSION', network_info['subversion']),
('$DONATION_ADDRESS', self.env.donation_address),
]:
banner = banner.replace(*pair)
return banner
def donation_address(self): def donation_address(self):
'''Return the donation address as a string, empty if there is none.''' '''Return the donation address as a string, empty if there is none.'''
return self.env.donation_address return self.env.donation_address

1
server/daemon.py

@ -20,6 +20,7 @@ import lib.util as util
class DaemonError(Exception): class DaemonError(Exception):
'''Raised when the daemon returns an error in its results.''' '''Raised when the daemon returns an error in its results.'''
class Daemon(util.LoggedClass): class Daemon(util.LoggedClass):
'''Handles connections to a daemon at the given URL.''' '''Handles connections to a daemon at the given URL.'''

1
server/db.py

@ -23,6 +23,7 @@ from server.version import VERSION
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class DB(util.LoggedClass): class DB(util.LoggedClass):
'''Simple wrapper of the backend database for querying. '''Simple wrapper of the backend database for querying.

40
server/env.py

@ -44,8 +44,14 @@ class Env(LoggedClass):
self.rpc_port = self.integer('RPC_PORT', 8000) self.rpc_port = self.integer('RPC_PORT', 8000)
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None) self.banner_file = self.default('BANNER_FILE', None)
self.tor_banner_file = self.default('TOR_BANNER_FILE',
self.banner_file)
self.anon_logs = self.default('ANON_LOGS', False) self.anon_logs = self.default('ANON_LOGS', False)
self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.log_sessions = self.integer('LOG_SESSIONS', 3600)
# Tor proxy
# Python 3.5.3 - revert back to localhost?
self.tor_proxy_host = self.default('TOR_PROXY_HOST', '127.0.0.1')
self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
# The electrum client takes the empty string as unspecified # The electrum client takes the empty string as unspecified
self.donation_address = self.default('DONATION_ADDRESS', '') self.donation_address = self.default('DONATION_ADDRESS', '')
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')
@ -60,31 +66,33 @@ class Env(LoggedClass):
self.irc = self.default('IRC', False) self.irc = self.default('IRC', False)
self.irc_nick = self.default('IRC_NICK', None) self.irc_nick = self.default('IRC_NICK', None)
self.identity = NetIdentity( # Identities
main_identity = NetIdentity(
self.default('REPORT_HOST', self.host), self.default('REPORT_HOST', self.host),
self.integer('REPORT_TCP_PORT', self.tcp_port) or None, self.integer('REPORT_TCP_PORT', self.tcp_port) or None,
self.integer('REPORT_SSL_PORT', self.ssl_port) or None, self.integer('REPORT_SSL_PORT', self.ssl_port) or None,
'' ''
) )
self.tor_identity = NetIdentity( if not main_identity.host.strip():
self.default('REPORT_HOST_TOR', ''), # must be a string raise self.Error('IRC host is empty')
if main_identity.tcp_port == main_identity.ssl_port:
raise self.Error('IRC TCP and SSL ports are the same')
self.identities = [main_identity]
tor_host = self.default('REPORT_HOST_TOR', '')
if tor_host.endswith('.onion'):
self.identities.append(NetIdentity(
tor_host,
self.integer('REPORT_TCP_PORT_TOR', self.integer('REPORT_TCP_PORT_TOR',
self.identity.tcp_port main_identity.tcp_port
if self.identity.tcp_port else if main_identity.tcp_port else
self.tcp_port) or None, self.tcp_port) or None,
self.integer('REPORT_SSL_PORT_TOR', self.integer('REPORT_SSL_PORT_TOR',
self.identity.ssl_port main_identity.ssl_port
if self.identity.ssl_port else if main_identity.ssl_port else
self.ssl_port) or None, self.ssl_port) or None,
'_tor' '_tor',
) ))
if self.irc:
if not self.identity.host.strip():
raise self.Error('IRC host is empty')
if self.identity.tcp_port == self.identity.ssl_port:
raise self.Error('IRC TCP and SSL ports are the same')
def default(self, envvar, default): def default(self, envvar, default):
return environ.get(envvar, default) return environ.get(envvar, default)

12
server/irc.py

@ -50,7 +50,7 @@ class IRC(LoggedClass):
# Register handlers for events we're interested in # Register handlers for events we're interested in
reactor = irc_client.Reactor() reactor = irc_client.Reactor()
for event in 'welcome join quit whoreply disconnect'.split(): for event in 'welcome join whoreply disconnect'.split():
reactor.add_global_handler(event, getattr(self, 'on_' + event)) reactor.add_global_handler(event, getattr(self, 'on_' + event))
# Note: Multiple nicks in same channel will trigger duplicate events # Note: Multiple nicks in same channel will trigger duplicate events
@ -96,12 +96,6 @@ class IRC(LoggedClass):
if match: if match:
connection.who(match.group(1)) connection.who(match.group(1))
def on_quit(self, connection, event):
'''Called when someone leaves our channel.'''
match = self.peer_regexp.match(event.source)
if match:
self.peer_mgr.remove_irc_peer(match.group(1))
def on_whoreply(self, connection, event): def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives. '''Called when a response to our who requests arrives.
@ -111,8 +105,8 @@ class IRC(LoggedClass):
nick = event.arguments[4] nick = event.arguments[4]
if nick.startswith(self.prefix): if nick.startswith(self.prefix):
line = event.arguments[6].split() line = event.arguments[6].split()
hostname, details = line[1], line[2:] hp_string = ' '.join(line[1:]) # hostname, ports, version etc.
self.peer_mgr.add_irc_peer(nick, hostname, details) self.peer_mgr.add_irc_peer(nick, hp_string)
class IrcClient(object): class IrcClient(object):

12
server/mempool.py

@ -162,8 +162,8 @@ class MemPool(util.LoggedClass):
deferred = pending deferred = pending
pending = [] pending = []
result, deferred = await self.controller.run_in_executor \ result, deferred = await self.controller.run_in_executor(
(self.process_raw_txs, raw_txs, deferred) self.process_raw_txs, raw_txs, deferred)
pending.extend(deferred) pending.extend(deferred)
hashXs = self.hashXs hashXs = self.hashXs
@ -204,7 +204,7 @@ class MemPool(util.LoggedClass):
# Deserialize each tx and put it in our priority queue # Deserialize each tx and put it in our priority queue
for tx_hash, raw_tx in raw_tx_map.items(): for tx_hash, raw_tx in raw_tx_map.items():
if not tx_hash in txs: if tx_hash not in txs:
continue continue
tx, _tx_hash = deserializer(raw_tx).read_tx() tx, _tx_hash = deserializer(raw_tx).read_tx()
@ -267,7 +267,7 @@ class MemPool(util.LoggedClass):
unconfirmed is True if any txin is unconfirmed. unconfirmed is True if any txin is unconfirmed.
''' '''
# hashXs is a defaultdict # hashXs is a defaultdict
if not hashX in self.hashXs: if hashX not in self.hashXs:
return [] return []
deserializer = self.coin.deserializer() deserializer = self.coin.deserializer()
@ -279,8 +279,8 @@ class MemPool(util.LoggedClass):
if not item or not raw_tx: if not item or not raw_tx:
continue continue
txin_pairs, txout_pairs = item txin_pairs, txout_pairs = item
tx_fee = (sum(v for hashX, v in txin_pairs) tx_fee = (sum(v for hashX, v in txin_pairs) -
- sum(v for hashX, v in txout_pairs)) sum(v for hashX, v in txout_pairs))
tx, tx_hash = deserializer(raw_tx).read_tx() tx, tx_hash = deserializer(raw_tx).read_tx()
unconfirmed = any(txin.prev_hash in self.txs for txin in tx.inputs) unconfirmed = any(txin.prev_hash in self.txs for txin in tx.inputs)
result.append((hex_hash, tx_fee, unconfirmed)) result.append((hex_hash, tx_fee, unconfirmed))

544
server/peers.py

@ -7,15 +7,184 @@
'''Peer management.''' '''Peer management.'''
import ast
import asyncio import asyncio
import socket import random
from collections import namedtuple import ssl
import time
from collections import defaultdict, Counter
from functools import partial
from lib.jsonrpc import JSONSession
from lib.peer import Peer
from lib.socks import SocksProxy
import lib.util as util import lib.util as util
from server.irc import IRC from server.irc import IRC
import server.version as version
IRCPeer = namedtuple('IRCPeer', 'ip_addr host details') PEERS_FILE = 'peers'
PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4)
STALE_SECS = 86400
WAKEUP_SECS = 300
def peer_from_env(env):
'''Return ourself as a peer from the environment settings.'''
main_identity = env.identities[0]
hosts = {identity.host: {'tcp_port': identity.tcp_port,
'ssl_port': identity.ssl_port}
for identity in env.identities}
features = {
'hosts': hosts,
'pruning': None,
'server_version': version.VERSION,
'protocol_min': version.PROTOCOL_MIN,
'protocol_max': version.PROTOCOL_MAX,
'genesis_hash': env.coin.GENESIS_HASH,
}
return Peer(main_identity.host, features, 'env')
class PeerSession(JSONSession):
'''An outgoing session to a peer.'''
def __init__(self, peer, peer_mgr, kind):
super().__init__()
self.max_send = 0
self.peer = peer
self.peer_mgr = peer_mgr
self.kind = kind
self.failed = False
self.log_prefix = '[{}] '.format(self.peer)
def have_pending_items(self):
self.peer_mgr.ensure_future(self.process_pending_items())
def connection_made(self, transport):
'''Handle an incoming client connection.'''
super().connection_made(transport)
self.log_prefix = '[{}] '.format(str(self.peer)[:25])
# Update IP address
if not self.peer.is_tor:
peer_info = self.peer_info()
if peer_info:
self.peer.ip_addr = peer_info[0]
# Collect data
proto_ver = (version.PROTOCOL_MIN, version.PROTOCOL_MAX)
self.send_request(self.on_version, 'server.version',
[version.VERSION, proto_ver])
self.send_request(self.on_peers_subscribe, 'server.peers.subscribe')
self.send_request(self.on_features, 'server.features')
def connection_lost(self, exc):
'''Handle disconnection.'''
super().connection_lost(exc)
self.peer_mgr.connection_lost(self)
def on_peers_subscribe(self, result, error):
'''Handle the response to the peers.subcribe message.'''
if error:
self.failed = True
self.log_error('server.peers.subscribe: {}'.format(error))
else:
self.check_remote_peers(result)
self.close_if_done()
def check_remote_peers(self, updates):
'''When a peer gives us a peer update.
Each update is expected to be of the form:
[ip_addr, hostname, ['v1.0', 't51001', 's51002']]
Return True if we're in the list of peers.
'''
try:
real_names = [' '.join([u[1]] + u[2]) for u in updates]
peers = [Peer.from_real_name(real_name, str(self.peer))
for real_name in real_names]
except Exception:
self.log_error('bad server.peers.subscribe response')
return False
self.peer_mgr.add_peers(peers)
my = self.peer_mgr.myself
for peer in my.matches(peers):
if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port:
return
# Announce ourself if not present
self.log_info('registering with server.add_peer')
self.send_request(self.on_add_peer, 'server.add_peer', [my.features])
def on_add_peer(self, result, error):
'''Handle the response to the add_peer message.'''
self.close_if_done()
def on_features(self, features, error):
# Several peers don't implement this. If they do, check they are
# the same network with the genesis hash.
verified = False
if not error and isinstance(features, dict):
forget = False
our_hash = self.peer_mgr.env.coin.GENESIS_HASH
their_hash = features.get('genesis_hash')
if their_hash:
verified = their_hash == our_hash
forget = their_hash != our_hash
if forget:
self.failed = True
self.peer.mark_bad()
self.log_warning('incorrect genesis hash')
else:
self.peer.update_features(features)
# For legacy peers not implementing features, check their height
# as a proxy to determining they're on our network
if not verified:
self.send_request(self.on_headers, 'blockchain.headers.subscribe')
self.close_if_done()
def on_headers(self, result, error):
'''Handle the response to the version message.'''
if error or not isinstance(result, dict):
self.failed = True
self.log_error('bad blockchain.headers.subscribe response')
else:
our_height = self.peer_mgr.controller.bp.db_height
their_height = result.get('block_height')
if (not isinstance(their_height, int) or
abs(our_height - their_height) > 5):
self.failed = True
self.peer.mark_bad()
self.log_warning('bad height {}'.format(their_height))
self.close_if_done()
def on_version(self, result, error):
'''Handle the response to the version message.'''
if error:
self.failed = True
self.log_error('server.version returned an error')
elif isinstance(result, str):
self.peer.server_version = result
self.peer.features['server_version'] = result
self.close_if_done()
def close_if_done(self):
if not self.has_pending_requests():
is_good = not self.failed
self.peer.last_connect = time.time()
self.peer_mgr.set_connection_status(self.peer, is_good)
if is_good:
if self.peer.is_tor:
self.log_info('verified via {} over Tor'.format(self.kind))
else:
self.log_info('verified via {} at {}'
.format(self.kind,
self.peer_addr(anon=False)))
self.close_connection()
class PeerManager(util.LoggedClass): class PeerManager(util.LoggedClass):
@ -24,88 +193,325 @@ class PeerManager(util.LoggedClass):
Attempts to maintain a connection with up to 8 peers. Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data. Issues a 'peers.subscribe' RPC to them and tells them our data.
''' '''
PROTOCOL_VERSION = '1.0'
def __init__(self, env, controller): def __init__(self, env, controller):
super().__init__() super().__init__()
# Initialise the Peer class
Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS
self.env = env self.env = env
self.controller = controller self.controller = controller
self.loop = controller.loop
self.irc = IRC(env, self) self.irc = IRC(env, self)
self.pruning = None self.myself = peer_from_env(env)
self._identities = [] # value is max outgoing connections at a time
# Keyed by nick self.semaphore = asyncio.BoundedSemaphore(value=8)
self.irc_peers = {} self.retry_event = asyncio.Event()
self._identities.append(env.identity) # Peers have one entry per hostname. Once connected, the
if env.tor_identity.host.endswith('.onion'): # ip_addr property is either None, an onion peer, or the
self._identities.append(env.tor_identity) # IP address that was connected to. Adding a peer will evict
# any other peers with the same host name or IP address.
def real_name(self, host, protocol_version, tcp_port, ssl_port): self.peers = set()
'''Real name as used on IRC.''' self.onion_peers = []
default_ports = self.env.coin.PEER_DEFAULT_PORTS self.last_tor_retry_time = 0
self.tor_proxy = SocksProxy(env.tor_proxy_host, env.tor_proxy_port,
def port_text(letter, port): loop=self.loop)
if port == default_ports.get(letter): self.import_peers()
return letter
def info(self):
'''The number of peers.'''
self.set_peer_statuses()
counter = Counter(peer.status for peer in self.peers)
return {
'bad': counter[PEER_BAD],
'good': counter[PEER_GOOD],
'never': counter[PEER_NEVER],
'stale': counter[PEER_STALE],
'total': len(self.peers),
}
def set_peer_statuses(self):
'''Set peer statuses.'''
cutoff = time.time() - STALE_SECS
for peer in self.peers:
if peer.bad:
peer.status = PEER_BAD
elif peer.last_connect > cutoff:
peer.status = PEER_GOOD
elif peer.last_connect:
peer.status = PEER_STALE
else:
peer.status = PEER_NEVER
def rpc_data(self):
'''Peer data for the peers RPC method.'''
self.set_peer_statuses()
descs = ['good', 'stale', 'never', 'bad']
def peer_data(peer):
data = peer.serialize()
data['status'] = descs[peer.status]
return data
def peer_key(peer):
return (peer.bad, -peer.last_connect)
return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)]
def add_peers(self, peers, limit=3, check_ports=False, source=None):
'''Add a limited number of peers that are not already present.'''
retry = False
new_peers = []
for peer in peers:
matches = peer.matches(self.peers)
if not matches:
new_peers.append(peer)
elif check_ports:
for match in matches:
if match.check_ports(peer):
self.logger.info('ports changed for {}'.format(peer))
retry = True
if new_peers:
retry = True
source = source or new_peers[0].source
if limit:
random.shuffle(new_peers)
use_peers = new_peers[:limit]
else:
use_peers = new_peers
self.logger.info('accepted {:d}/{:d} new peers of {:d} from {}'
.format(len(use_peers), len(new_peers),
len(peers), source))
self.peers.update(use_peers)
if retry:
self.retry_event.set()
def on_add_peer(self, features, source):
'''Add peers from an incoming connection.'''
peers = Peer.peers_from_features(features, source)
if peers:
self.log_info('add_peer request received from {}'
.format(peers[0].host))
self.add_peers(peers, check_ports=True)
return bool(peers)
def on_peers_subscribe(self, is_tor):
'''Returns the server peers as a list of (ip, host, details) tuples.
We return all peers we've connected to in the last day.
Additionally, if we don't have onion routing, we return up to
three randomly selected onion servers.
'''
cutoff = time.time() - STALE_SECS
recent = [peer for peer in self.peers
if peer.last_connect > cutoff and
not peer.bad and peer.is_public]
onion_peers = []
# Always report ourselves if valid (even if not public)
peers = set()
if self.myself.last_connect > cutoff:
peers.add(self.myself)
# Bucket the clearnet peers and select one from each
buckets = defaultdict(list)
for peer in recent:
if peer.is_tor:
onion_peers.append(peer)
else: else:
return letter + str(port) buckets[peer.bucket()].append(peer)
peers.update(random.choice(bpeers) for bpeers in buckets.values())
parts = [host, 'v' + protocol_version] # Add up to 20% onion peers (but up to 10 is OK anyway)
for letter, port in (('s', ssl_port), ('t', tcp_port)): onion_peers = onion_peers or self.onion_peers
if port: random.shuffle(onion_peers)
parts.append(port_text(letter, port)) max_onion = 50 if is_tor else max(10, len(peers) // 4)
return ' '.join(parts)
def irc_name_pairs(self): peers.update(onion_peers[:max_onion])
return [(self.real_name(identity.host, self.PROTOCOL_VERSION,
identity.tcp_port, identity.ssl_port),
identity.nick_suffix)
for identity in self._identities]
def identities(self): return [peer.to_tuple() for peer in peers]
'''Return a list of network identities of this server.'''
return self._identities def serialize(self):
serialized_peers = [peer.serialize() for peer in self.peers
if not peer.bad]
data = (1, serialized_peers) # version 1
return repr(data)
def write_peers_file(self):
with util.open_truncate(PEERS_FILE) as f:
f.write(self.serialize().encode())
self.logger.info('wrote out {:,d} peers'.format(len(self.peers)))
def read_peers_file(self):
try:
with util.open_file(PEERS_FILE, create=True) as f:
data = f.read(-1).decode()
except Exception as e:
self.logger.error('error reading peers file {}'.format(e))
else:
if data:
version, items = ast.literal_eval(data)
if version == 1:
peers = [Peer.deserialize(item) for item in items]
self.add_peers(peers, source='peers file', limit=None)
def import_peers(self):
'''Import hard-coded peers from a file or the coin defaults.'''
self.add_peers([self.myself])
coin_peers = self.env.coin.PEERS
self.onion_peers = [Peer.from_real_name(rn, 'coins.py')
for rn in coin_peers if '.onion ' in rn]
# If we don't have many peers in the peers file, add
# hard-coded ones
self.read_peers_file()
if len(self.peers) < 5:
peers = [Peer.from_real_name(real_name, 'coins.py')
for real_name in coin_peers]
self.add_peers(peers, limit=None)
def connect_to_irc(self):
'''Connect to IRC if not disabled.'''
if self.env.irc and self.env.coin.IRC_PREFIX:
pairs = [(self.myself.real_name(ident.host), ident.nick_suffix)
for ident in self.env.identities]
self.ensure_future(self.irc.start(pairs))
else:
self.logger.info('IRC is disabled')
def add_irc_peer(self, nick, real_name):
'''Add an IRC peer.'''
peer = Peer.from_real_name(real_name, '{}'.format(nick))
self.add_peers([peer])
def ensure_future(self, coro, callback=None): def ensure_future(self, coro, callback=None):
'''Schedule the coro to be run.''' '''Schedule the coro to be run.'''
return self.controller.ensure_future(coro, callback=callback) return self.controller.ensure_future(coro, callback=callback)
async def main_loop(self): async def main_loop(self):
'''Not a loop for now...''' '''Main loop performing peer maintenance. This includes
if self.env.irc:
self.ensure_future(self.irc.start(self.irc_name_pairs()))
else:
self.logger.info('IRC is disabled')
def dns_lookup_peer(self, nick, hostname, details): 1) Forgetting unreachable peers.
try: 2) Verifying connectivity of new peers.
ip_addr = None 3) Retrying old peers at regular intervals.
'''
self.connect_to_irc()
try: try:
ip_addr = socket.gethostbyname(hostname) while True:
except socket.error: timeout = self.loop.call_later(WAKEUP_SECS,
pass # IPv6? self.retry_event.set)
ip_addr = ip_addr or hostname await self.retry_event.wait()
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details) self.retry_event.clear()
self.logger.info('new IRC peer {} at {} ({})' timeout.cancel()
.format(nick, hostname, details)) await self.retry_peers()
except UnicodeError: finally:
# UnicodeError comes from invalid domains (issue #68) self.write_peers_file()
self.logger.info('IRC peer domain {} invalid'.format(hostname))
def add_irc_peer(self, *args):
'''Schedule DNS lookup of peer.'''
self.controller.schedule_executor(self.dns_lookup_peer, *args)
def remove_irc_peer(self, nick):
'''Remove a peer from our IRC peers map.'''
self.logger.info('removing IRC peer {}'.format(nick))
self.irc_peers.pop(nick, None)
def count(self):
return len(self.irc_peers)
def rpc_data(self): def is_coin_onion_peer(self, peer):
return self.irc_peers '''Return true if this peer is a hard-coded onion peer.'''
return peer.is_tor and any(peer.host in real_name
for real_name in self.env.coin.PEERS)
async def retry_peers(self):
'''Retry peers that are close to getting stale.'''
# Exponential backoff of retries
now = time.time()
nearly_stale_time = (now - STALE_SECS) + WAKEUP_SECS * 2
def retry_peer(peer):
# Try some Tor at startup to determine the proxy so we can
# serve the right banner file
if self.last_tor_retry_time == 0 and self.is_coin_onion_peer(peer):
return True
# Retry a peer whose ports might have updated
if peer.other_port_pairs:
return True
# Retry a good connection if it is about to turn stale
if peer.try_count == 0:
return peer.last_connect < nearly_stale_time
# Retry a failed connection if enough time has passed
return peer.last_try < now - WAKEUP_SECS * 2 ** peer.try_count
peers = [peer for peer in self.peers if retry_peer(peer)]
# If we don't have a tor proxy drop tor peers, but retry
# occasionally
if self.tor_proxy.port is None:
if now < self.last_tor_retry_time + 3600:
peers = [peer for peer in peers if not peer.is_tor]
elif any(peer.is_tor for peer in peers):
self.last_tor_retry_time = now
for peer in peers:
peer.last_try = time.time()
peer.try_count += 1
pairs = peer.connection_port_pairs()
if peer.bad or not pairs:
self.maybe_forget_peer(peer)
else:
await self.semaphore.acquire()
self.retry_peer(peer, pairs)
def retry_peer(self, peer, port_pairs):
kind, port = port_pairs[0]
# Python 3.5.3: use PROTOCOL_TLS
sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23) if kind == 'SSL' else None
if peer.is_tor:
create_connection = self.tor_proxy.create_connection
else:
create_connection = self.loop.create_connection
protocol_factory = partial(PeerSession, peer, self, kind)
coro = create_connection(protocol_factory, peer.host, port, ssl=sslc)
callback = partial(self.connection_done, peer, port_pairs)
self.ensure_future(coro, callback)
def connection_done(self, peer, port_pairs, future):
'''Called when a connection attempt succeeds or fails.
If failed, log it and try remaining port pairs. If none,
release the connection count semaphore.
'''
exception = future.exception()
if exception:
kind, port = port_pairs[0]
self.logger.info('failed connecting to {} at {} port {:d}: {}'
.format(peer, kind, port, exception))
port_pairs = port_pairs[1:]
if port_pairs:
self.retry_peer(peer, port_pairs)
else:
self.set_connection_status(peer, False)
self.semaphore.release()
def connection_lost(self, session):
'''Called by the peer session when disconnected.'''
self.semaphore.release()
def set_connection_status(self, peer, good):
'''Called when a connection succeeded or failed.'''
if good:
peer.try_count = 0
peer.source = 'peer'
# Remove matching IP addresses
for match in peer.matches(self.peers):
if match != peer and peer.host == peer.ip_addr:
self.peers.remove(match)
else:
self.maybe_forget_peer(peer)
def maybe_forget_peer(self, peer):
'''Forget the peer if appropriate, e.g. long-term unreachable.'''
if peer.bad:
forget = peer.last_connect < time.time() - STALE_SECS // 2
else:
try_limit = 10 if peer.last_connect else 3
forget = peer.try_count >= try_limit
if forget:
desc = 'bad' if peer.bad else 'unreachable'
self.logger.info('forgetting {} peer: {}'.format(desc, peer))
self.peers.discard(peer)
def on_peers_subscribe(self): return forget
'''Returns the server peers as a list of (ip, host, details) tuples.'''
return list(self.irc_peers.values())

78
server/session.py

@ -7,12 +7,13 @@
'''Classes for local RPC server and remote client TCP/SSL servers.''' '''Classes for local RPC server and remote client TCP/SSL servers.'''
import codecs
import time import time
from functools import partial from functools import partial
from lib.jsonrpc import JSONSession, RPCError, JSONRPCv2 from lib.jsonrpc import JSONSession, RPCError, JSONRPCv2
from server.daemon import DaemonError from server.daemon import DaemonError
from server.version import VERSION import server.version as version
class SessionBase(JSONSession): class SessionBase(JSONSession):
@ -32,6 +33,7 @@ class SessionBase(JSONSession):
self.env = controller.env self.env = controller.env
self.daemon = self.bp.daemon self.daemon = self.bp.daemon
self.client = 'unknown' self.client = 'unknown'
self.protocol_version = '1.0'
self.anon_logs = self.env.anon_logs self.anon_logs = self.env.anon_logs
self.last_delay = 0 self.last_delay = 0
self.txs_sent = 0 self.txs_sent = 0
@ -42,6 +44,7 @@ class SessionBase(JSONSession):
self.bw_time = self.start_time self.bw_time = self.start_time
self.bw_interval = 3600 self.bw_interval = 3600
self.bw_used = 0 self.bw_used = 0
self.peer_added = False
def have_pending_items(self): def have_pending_items(self):
'''Called each time the pending item queue goes from empty to having '''Called each time the pending item queue goes from empty to having
@ -74,6 +77,7 @@ class SessionBase(JSONSession):
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
super().connection_lost(exc)
msg = '' msg = ''
if self.pause: if self.pause:
msg += ' whilst paused' msg += ' whilst paused'
@ -116,6 +120,8 @@ class ElectrumX(SessionBase):
'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.headers.subscribe': self.headers_subscribe,
'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe,
'blockchain.transaction.broadcast': self.transaction_broadcast, 'blockchain.transaction.broadcast': self.transaction_broadcast,
'server.add_peer': self.add_peer,
'server.banner': self.banner,
'server.features': self.server_features, 'server.features': self.server_features,
'server.peers.subscribe': self.peers_subscribe, 'server.peers.subscribe': self.peers_subscribe,
'server.version': self.server_version, 'server.version': self.server_version,
@ -171,9 +177,19 @@ class ElectrumX(SessionBase):
self.subscribe_height = True self.subscribe_height = True
return self.height() return self.height()
def add_peer(self, features):
'''Add a peer.'''
if self.peer_added:
return False
peer_mgr = self.controller.peer_mgr
peer_info = self.peer_info()
source = peer_info[0] if peer_info else 'unknown'
self.peer_added = peer_mgr.on_add_peer(features, source)
return self.peer_added
def peers_subscribe(self): def peers_subscribe(self):
'''Return the server peers as a list of (ip, host, details) tuples.''' '''Return the server peers as a list of (ip, host, details) tuples.'''
return self.controller.peer_mgr.on_peers_subscribe() return self.controller.peer_mgr.on_peers_subscribe(self.is_tor())
async def address_subscribe(self, address): async def address_subscribe(self, address):
'''Subscribe to an address. '''Subscribe to an address.
@ -190,18 +206,50 @@ class ElectrumX(SessionBase):
def server_features(self): def server_features(self):
'''Returns a dictionary of server features.''' '''Returns a dictionary of server features.'''
peer_mgr = self.controller.peer_mgr return self.controller.peer_mgr.myself.features
hosts = {identity.host: {
'tcp_port': identity.tcp_port, def is_tor(self):
'ssl_port': identity.ssl_port, '''Try to detect if the connection is to a tor hidden service we are
} for identity in peer_mgr.identities()} running.'''
tor_proxy = self.controller.peer_mgr.tor_proxy
return { peer_info = self.peer_info()
'hosts': hosts, return peer_info and peer_info[0] == tor_proxy.ip_addr
'pruning': peer_mgr.pruning,
'protocol_version': peer_mgr.PROTOCOL_VERSION, async def replaced_banner(self, banner):
'server_version': VERSION, network_info = await self.controller.daemon_request('getnetworkinfo')
} ni_version = network_info['version']
major, minor = divmod(ni_version, 1000000)
minor, revision = divmod(minor, 10000)
revision //= 100
daemon_version = '{:d}.{:d}.{:d}'.format(major, minor, revision)
for pair in [
('$VERSION', version.VERSION),
('$DAEMON_VERSION', daemon_version),
('$DAEMON_SUBVERSION', network_info['subversion']),
('$DONATION_ADDRESS', self.env.donation_address),
]:
banner = banner.replace(*pair)
return banner
async def banner(self):
'''Return the server banner text.'''
banner = 'Welcome to Electrum!'
if self.is_tor():
banner_file = self.env.tor_banner_file
else:
banner_file = self.env.banner_file
if banner_file:
try:
with codecs.open(banner_file, 'r', 'utf-8') as f:
banner = f.read()
except Exception as e:
self.log_error('reading banner file {}: {}'
.format(banner_file, e))
else:
banner = await self.replaced_banner(banner)
return banner
def server_version(self, client_name=None, protocol_version=None): def server_version(self, client_name=None, protocol_version=None):
'''Returns the server version as a string. '''Returns the server version as a string.
@ -213,7 +261,7 @@ class ElectrumX(SessionBase):
self.client = str(client_name)[:17] self.client = str(client_name)[:17]
if protocol_version is not None: if protocol_version is not None:
self.protocol_version = protocol_version self.protocol_version = protocol_version
return VERSION return version.VERSION
async def transaction_broadcast(self, raw_tx): async def transaction_broadcast(self, raw_tx):
'''Broadcast a raw transaction to the network. '''Broadcast a raw transaction to the network.

1
server/storage.py

@ -12,6 +12,7 @@ from functools import partial
import lib.util as util import lib.util as util
def db_class(name): def db_class(name):
'''Returns a DB engine class.''' '''Returns a DB engine class.'''
for db_class in util.subclasses(Storage): for db_class in util.subclasses(Storage):

6
server/version.py

@ -1 +1,5 @@
VERSION = "ElectrumX 0.10.19" # Server name and protocol versions
VERSION = 'ElectrumX 0.11.0'
PROTOCOL_MIN = '1.0'
PROTOCOL_MAX = '1.0'

2
setup.py

@ -10,7 +10,7 @@ setuptools.setup(
# "irc" package is only required if IRC connectivity is enabled # "irc" package is only required if IRC connectivity is enabled
# via environment variables, in which case I've tested with 15.0.4 # via environment variables, in which case I've tested with 15.0.4
# "x11_hash" package (1.4) is required to sync DASH network. # "x11_hash" package (1.4) is required to sync DASH network.
install_requires=['plyvel', 'pylru', 'aiohttp >= 1'], install_requires=['plyvel', 'pylru', 'irc', 'aiohttp >= 1'],
packages=setuptools.find_packages(), packages=setuptools.find_packages(),
description='ElectrumX Server', description='ElectrumX Server',
author='Neil Booth', author='Neil Booth',

4
tests/test_storage.py

@ -44,8 +44,8 @@ def test_batch(db):
def test_iterator(db): def test_iterator(db):
""" """
The iterator should contain all key/value pairs starting with prefix ordered The iterator should contain all key/value pairs starting with prefix
by key. ordered by key.
""" """
for i in range(5): for i in range(5):
db.put(b"abc" + str.encode(str(i)), str.encode(str(i))) db.put(b"abc" + str.encode(str(i)), str.encode(str(i)))

3
tests/test_util.py

@ -19,7 +19,6 @@ def test_cachedproperty():
cls.CALL_COUNT += 1 cls.CALL_COUNT += 1
return cls.CALL_COUNT return cls.CALL_COUNT
t = Target() t = Target()
assert t.prop == t.prop == 1 assert t.prop == t.prop == 1
assert Target.cls_prop == Target.cls_prop == 1 assert Target.cls_prop == Target.cls_prop == 1
@ -56,4 +55,4 @@ def test_chunks():
def test_increment_byte_string(): def test_increment_byte_string():
assert util.increment_byte_string(b'1') == b'2' assert util.increment_byte_string(b'1') == b'2'
assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02' assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02'
assert util.increment_byte_string(b'\xff\xff') == None assert util.increment_byte_string(b'\xff\xff') is None

Loading…
Cancel
Save