ghost43
3 years ago
committed by
GitHub
18 changed files with 1393 additions and 0 deletions
@ -0,0 +1,2 @@ |
|||||
|
KERNEL=="ttyUSB*", SUBSYSTEMS=="usb", ATTRS{idVendor}=="10c4", ATTRS{idProduct}=="ea60", MODE="0660", GROUP="plugdev", TAG+="uaccess", TAG+="udev-acl", SYMLINK+="jade%n" |
||||
|
KERNEL=="ttyACM*", SUBSYSTEMS=="usb", ATTRS{idVendor}=="1a86", ATTRS{idProduct}=="55d4", MODE="0660", GROUP="plugdev", TAG+="uaccess", TAG+="udev-acl", SYMLINK+="jade%n" |
After Width: | Height: | Size: 3.4 KiB |
After Width: | Height: | Size: 3.4 KiB |
@ -0,0 +1,7 @@ |
|||||
|
from electrum.i18n import _ |
||||
|
|
||||
|
fullname = 'Blockstream Jade Wallet' |
||||
|
description = 'Provides support for the Blockstream Jade hardware wallet' |
||||
|
#requires = [('', 'github.com/')] |
||||
|
registers_keystore = ('hardware', 'jade', _("Jade wallet")) |
||||
|
available_for = ['qt', 'cmdline'] |
@ -0,0 +1,14 @@ |
|||||
|
from electrum.plugin import hook |
||||
|
from .jade import JadePlugin |
||||
|
from ..hw_wallet import CmdLineHandler |
||||
|
|
||||
|
class Plugin(JadePlugin): |
||||
|
handler = CmdLineHandler() |
||||
|
@hook |
||||
|
def init_keystore(self, keystore): |
||||
|
if not isinstance(keystore, self.keystore_class): |
||||
|
return |
||||
|
keystore.handler = self.handler |
||||
|
|
||||
|
def create_handler(self, window): |
||||
|
return self.handler |
@ -0,0 +1,476 @@ |
|||||
|
import os |
||||
|
import base64 |
||||
|
import json |
||||
|
from typing import Optional |
||||
|
|
||||
|
from electrum import bip32, constants |
||||
|
from electrum.crypto import sha256 |
||||
|
from electrum.i18n import _ |
||||
|
from electrum.keystore import Hardware_KeyStore |
||||
|
from electrum.transaction import Transaction |
||||
|
from electrum.wallet import Multisig_Wallet |
||||
|
from electrum.util import UserFacingException |
||||
|
from electrum.base_wizard import ScriptTypeNotSupported |
||||
|
from electrum.logging import get_logger |
||||
|
from electrum.plugin import runs_in_hwd_thread, Device |
||||
|
from electrum.network import Network |
||||
|
|
||||
|
from ..hw_wallet import HW_PluginBase, HardwareClientBase |
||||
|
from ..hw_wallet.plugin import OutdatedHwFirmwareException |
||||
|
|
||||
|
|
||||
|
_logger = get_logger(__name__) |
||||
|
|
||||
|
#import logging |
||||
|
#LOGGING = logging.INFO |
||||
|
#if LOGGING: |
||||
|
# logger = logging.getLogger('jade') |
||||
|
# logger.setLevel(LOGGING) |
||||
|
# device_logger = logging.getLogger('jade-device') |
||||
|
# device_logger.setLevel(LOGGING) |
||||
|
|
||||
|
try: |
||||
|
# Do imports |
||||
|
from .jadepy.jade import JadeAPI |
||||
|
from serial.tools import list_ports |
||||
|
except ImportError as e: |
||||
|
_logger.exception('error importing Jade plugin deps') |
||||
|
|
||||
|
# Ignore -beta and -rc etc labels |
||||
|
def _versiontuple(v): |
||||
|
return tuple(map(int, (v.split('-')[0].split('.')))) |
||||
|
|
||||
|
def _is_multisig(wallet): |
||||
|
return type(wallet) is Multisig_Wallet |
||||
|
|
||||
|
# Ensure a multisig wallet is registered on Jade hw. |
||||
|
# Derives and returns the deterministic name for that multisig registration |
||||
|
def _register_multisig_wallet(wallet, keystore, address): |
||||
|
wallet_fingerprint_hash = sha256(wallet.get_fingerprint()) |
||||
|
multisig_name = 'ele' + wallet_fingerprint_hash.hex()[:12] |
||||
|
|
||||
|
# Collect all the signer data in case we need to register the |
||||
|
# multisig wallet on the Jade hw - NOTE: re-register is a no-op. |
||||
|
signers = [] |
||||
|
for kstore in wallet.get_keystores(): |
||||
|
fingerprint = kstore.get_root_fingerprint() |
||||
|
bip32_path_prefix = kstore.get_derivation_prefix() |
||||
|
derivation_path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path_prefix) |
||||
|
|
||||
|
# Jade only understands standard xtypes, so convert here |
||||
|
node = bip32.BIP32Node.from_xkey(kstore.xpub) |
||||
|
standard_xpub = node._replace(xtype='standard').to_xkey() |
||||
|
|
||||
|
signers.append({'fingerprint': bytes.fromhex(fingerprint), |
||||
|
'derivation': derivation_path, |
||||
|
'xpub': standard_xpub, |
||||
|
'path': []}) |
||||
|
|
||||
|
# Check multisig is registered - re-registering is a no-op |
||||
|
# NOTE: electrum multisigs appear to always be sorted-multisig |
||||
|
txin_type = wallet.get_txin_type(address) |
||||
|
keystore.register_multisig(multisig_name, txin_type, True, wallet.m, signers) |
||||
|
|
||||
|
# Return the name used to register the wallet |
||||
|
return multisig_name |
||||
|
|
||||
|
# Helper to adapt Jade's http call/data to Network.send_http_on_proxy() |
||||
|
def _http_request(params): |
||||
|
# Use the first non-onion url |
||||
|
url = [url for url in params['urls'] if not url.endswith('.onion')][0] |
||||
|
method = params['method'].lower() |
||||
|
json_payload = params.get('data') |
||||
|
json_response = Network.send_http_on_proxy(method, url, json=json_payload) |
||||
|
return {'body': json.loads(json_response)} |
||||
|
|
||||
|
class Jade_Client(HardwareClientBase): |
||||
|
|
||||
|
@staticmethod |
||||
|
def _network() -> str: |
||||
|
return 'localtest' if constants.net.NET_NAME == 'regtest' else constants.net.NET_NAME |
||||
|
|
||||
|
ADDRTYPES = {'standard': 'pkh(k)', |
||||
|
'p2pkh': 'pkh(k)', |
||||
|
'p2wpkh': 'wpkh(k)', |
||||
|
'p2wpkh-p2sh': 'sh(wpkh(k))'} |
||||
|
|
||||
|
MULTI_ADDRTYPES = {'standard': 'sh(multi(k))', |
||||
|
'p2sh': 'sh(multi(k))', |
||||
|
'p2wsh': 'wsh(multi(k))', |
||||
|
'p2wsh-p2sh': 'sh(wsh(multi(k)))'} |
||||
|
|
||||
|
@classmethod |
||||
|
def _convertAddrType(cls, addrType: str, multisig: bool) -> str: |
||||
|
return cls.MULTI_ADDRTYPES[addrType] if multisig else cls.ADDRTYPES[addrType] |
||||
|
|
||||
|
def __init__(self, device: str, plugin: HW_PluginBase): |
||||
|
HardwareClientBase.__init__(self, plugin=plugin) |
||||
|
|
||||
|
# Connect with a small timeout to test connection |
||||
|
self.jade = JadeAPI.create_serial(device, timeout=1) |
||||
|
self.jade.connect() |
||||
|
|
||||
|
verinfo = self.jade.get_version_info() |
||||
|
self.fwversion = _versiontuple(verinfo['JADE_VERSION']) |
||||
|
self.efusemac = verinfo['EFUSEMAC'] |
||||
|
self.jade.disconnect() |
||||
|
|
||||
|
# Reconnect with a the default timeout for all subsequent calls |
||||
|
self.jade = JadeAPI.create_serial(device) |
||||
|
self.jade.connect() |
||||
|
|
||||
|
# Push some host entropy into jade |
||||
|
self.jade.add_entropy(os.urandom(32)) |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def authenticate(self): |
||||
|
# Ensure Jade unlocked - always call hw unit at least once |
||||
|
# If the hw is already unlocked, this call returns immediately/no-op |
||||
|
# NOTE: uses provided http/networking which respects any user proxy |
||||
|
authenticated = False |
||||
|
while not authenticated: |
||||
|
authenticated = self.jade.auth_user(self._network(), http_request_fn=_http_request) |
||||
|
|
||||
|
def is_pairable(self): |
||||
|
return True |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def close(self): |
||||
|
self.jade.disconnect() |
||||
|
self.jade = None |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def is_initialized(self): |
||||
|
verinfo = self.jade.get_version_info() |
||||
|
return verinfo['JADE_STATE'] != 'UNINIT' |
||||
|
|
||||
|
def label(self) -> Optional[str]: |
||||
|
return self.efusemac[-6:] |
||||
|
|
||||
|
def get_soft_device_id(self): |
||||
|
return f'Jade {self.label()}' |
||||
|
|
||||
|
def device_model_name(self): |
||||
|
return 'Blockstream Jade' |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def has_usable_connection_with_device(self): |
||||
|
if self.efusemac is None: |
||||
|
return False |
||||
|
|
||||
|
try: |
||||
|
verinfo = self.jade.get_version_info() |
||||
|
return verinfo['EFUSEMAC'] == self.efusemac |
||||
|
except BaseException: |
||||
|
return False |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def get_xpub(self, bip32_path, xtype): |
||||
|
self.authenticate() |
||||
|
|
||||
|
# Jade only provides traditional xpubs ... |
||||
|
path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path) |
||||
|
xpub = self.jade.get_xpub(self._network(), path) |
||||
|
|
||||
|
# ... so convert to relevant xtype locally |
||||
|
node = bip32.BIP32Node.from_xkey(xpub) |
||||
|
return node._replace(xtype=xtype).to_xkey() |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def sign_message(self, bip32_path_prefix, sequence, message): |
||||
|
self.authenticate() |
||||
|
|
||||
|
path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path_prefix) |
||||
|
path.extend(sequence) |
||||
|
|
||||
|
if isinstance(message, bytes) or isinstance(message, bytearray): |
||||
|
message = message.decode('utf-8') |
||||
|
|
||||
|
# Signature verification does not work with anti-exfil, so stick with default (rfc6979) |
||||
|
sig = self.jade.sign_message(path, message) |
||||
|
return base64.b64decode(sig) |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def sign_tx(self, txn_bytes, inputs, change): |
||||
|
self.authenticate() |
||||
|
|
||||
|
# Add some host entropy for AE sigs (although we won't verify) |
||||
|
for input in inputs: |
||||
|
if input['path'] is not None: |
||||
|
input['ae_host_entropy'] = os.urandom(32) |
||||
|
input['ae_host_commitment'] = os.urandom(32) |
||||
|
|
||||
|
# Map change script type |
||||
|
for output in change: |
||||
|
if output and output.get('variant') is not None: |
||||
|
output['variant'] = self._convertAddrType(output['variant'], False) |
||||
|
|
||||
|
# Pass to Jade to generate signatures |
||||
|
sig_data = self.jade.sign_tx(self._network(), txn_bytes, inputs, change, use_ae_signatures=True) |
||||
|
|
||||
|
# Extract signatures from returned data (sig[0] is the AE signer-commitment) |
||||
|
return [sig[1] for sig in sig_data] |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def show_address(self, bip32_path_prefix, sequence, txin_type): |
||||
|
self.authenticate() |
||||
|
path = bip32.convert_bip32_path_to_list_of_uint32(bip32_path_prefix) |
||||
|
path.extend(sequence) |
||||
|
script_variant = self._convertAddrType(txin_type, multisig=False) |
||||
|
address = self.jade.get_receive_address(self._network(), path, variant=script_variant) |
||||
|
return address |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def register_multisig(self, multisig_name, txin_type, sorted, threshold, signers): |
||||
|
self.authenticate() |
||||
|
variant = self._convertAddrType(txin_type, multisig=True) |
||||
|
return self.jade.register_multisig(self._network(), multisig_name, variant, sorted, threshold, signers) |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def show_address_multi(self, multisig_name, paths): |
||||
|
self.authenticate() |
||||
|
return self.jade.get_receive_address(self._network(), paths, multisig_name=multisig_name) |
||||
|
|
||||
|
class Jade_KeyStore(Hardware_KeyStore): |
||||
|
hw_type = 'jade' |
||||
|
device = 'Jade' |
||||
|
|
||||
|
plugin: 'JadePlugin' |
||||
|
|
||||
|
def get_client(self): |
||||
|
return self.plugin.get_client(self) |
||||
|
|
||||
|
def decrypt_message(self, sequence, message, password): |
||||
|
raise UserFacingException(_('Encryption and decryption are not implemented by {}').format(self.device)) |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def sign_message(self, sequence, message, password, *, script_type=None): |
||||
|
self.handler.show_message(_("Please confirm signing the message with your Jade device...")) |
||||
|
try: |
||||
|
client = self.get_client() |
||||
|
bip32_path_prefix = self.get_derivation_prefix() |
||||
|
return client.sign_message(bip32_path_prefix, sequence, message) |
||||
|
finally: |
||||
|
self.handler.finished() |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def sign_transaction(self, tx, password): |
||||
|
if tx.is_complete(): |
||||
|
return |
||||
|
|
||||
|
self.handler.show_message(_("Preparing to sign transaction ...")) |
||||
|
try: |
||||
|
wallet = self.handler.get_wallet() |
||||
|
is_multisig = _is_multisig(wallet) |
||||
|
|
||||
|
# Fetch inputs of the transaction to sign |
||||
|
jade_inputs = [] |
||||
|
for txin in tx.inputs(): |
||||
|
pubkey, path = self.find_my_pubkey_in_txinout(txin) |
||||
|
witness_input = txin.script_type in ['p2wpkh-p2sh', 'p2wsh-p2sh', 'p2wpkh', 'p2wsh'] |
||||
|
redeem_script = Transaction.get_preimage_script(txin) |
||||
|
redeem_script = bytes.fromhex(redeem_script) if redeem_script is not None else None |
||||
|
input_tx = txin.utxo |
||||
|
input_tx = bytes.fromhex(input_tx.serialize()) if input_tx is not None else None |
||||
|
|
||||
|
# Build the input and add to the list - include some host entropy for AE sigs (although we won't verify) |
||||
|
jade_inputs.append({'is_witness': witness_input, |
||||
|
'input_tx': input_tx, |
||||
|
'script': redeem_script, |
||||
|
'path': path}) |
||||
|
|
||||
|
# Change detection |
||||
|
change = [None] * len(tx.outputs()) |
||||
|
for index, txout in enumerate(tx.outputs()): |
||||
|
if txout.is_mine and txout.is_change: |
||||
|
if is_multisig: |
||||
|
# Multisig - wallet details must be registered on Jade hw |
||||
|
multisig_name = _register_multisig_wallet(wallet, self, txout.address) |
||||
|
|
||||
|
# Jade only needs the path suffix(es) and the multisig registration |
||||
|
# name to generate the address, as the fixed derivation part is |
||||
|
# embedded in the multisig wallet registration record |
||||
|
# NOTE: all cosigners have same path suffix |
||||
|
path_suffix = wallet.get_address_index(txout.address) |
||||
|
paths = [path_suffix] * wallet.n |
||||
|
change[index] = {'multisig_name': multisig_name, 'paths': paths} |
||||
|
else: |
||||
|
# Pass entire path |
||||
|
pubkey, path = self.find_my_pubkey_in_txinout(txout) |
||||
|
change[index] = {'path':path, 'variant': txout.script_type} |
||||
|
|
||||
|
# The txn itself |
||||
|
txn_bytes = bytes.fromhex(tx.serialize_to_network()) |
||||
|
|
||||
|
# Request Jade generate the signatures for our inputs. |
||||
|
# Change details are passed to be validated on the hw (user does not confirm) |
||||
|
self.handler.show_message(_("Please confirm the transaction details on your Jade device...")) |
||||
|
client = self.get_client() |
||||
|
signatures = client.sign_tx(txn_bytes, jade_inputs, change) |
||||
|
assert len(signatures) == len(tx.inputs()) |
||||
|
|
||||
|
# Inject signatures into tx |
||||
|
for index, (txin, signature) in enumerate(zip(tx.inputs(), signatures)): |
||||
|
pubkey, path = self.find_my_pubkey_in_txinout(txin) |
||||
|
if pubkey is not None and signature is not None: |
||||
|
tx.add_signature_to_txin(txin_idx=index, |
||||
|
signing_pubkey=pubkey.hex(), |
||||
|
sig=signature.hex()) |
||||
|
finally: |
||||
|
self.handler.finished() |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def show_address(self, sequence, txin_type): |
||||
|
self.handler.show_message(_("Showing address ...")) |
||||
|
try: |
||||
|
client = self.get_client() |
||||
|
bip32_path_prefix = self.get_derivation_prefix() |
||||
|
return client.show_address(bip32_path_prefix, sequence, txin_type) |
||||
|
finally: |
||||
|
self.handler.finished() |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def register_multisig(self, name, txin_type, sorted, threshold, signers): |
||||
|
self.handler.show_message(_("Please confirm the multisig wallet details on your Jade device...")) |
||||
|
try: |
||||
|
client = self.get_client() |
||||
|
return client.register_multisig(name, txin_type, sorted, threshold, signers) |
||||
|
finally: |
||||
|
self.handler.finished() |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def show_address_multi(self, multisig_name, paths): |
||||
|
self.handler.show_message(_("Showing address ...")) |
||||
|
try: |
||||
|
client = self.get_client() |
||||
|
return client.show_address_multi(multisig_name, paths) |
||||
|
finally: |
||||
|
self.handler.finished() |
||||
|
|
||||
|
|
||||
|
class JadePlugin(HW_PluginBase): |
||||
|
keystore_class = Jade_KeyStore |
||||
|
minimum_library = (0, 0, 1) |
||||
|
DEVICE_IDS = [(0x10c4, 0xea60), (0x1a86, 0x55d4)] |
||||
|
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') |
||||
|
MIN_SUPPORTED_FW_VERSION = (0, 1, 32) |
||||
|
|
||||
|
# For testing with qemu simulator (experimental) |
||||
|
SIMULATOR_PATH = None # 'tcp:127.0.0.1:2222' |
||||
|
SIMULATOR_TEST_SEED = None # bytes.fromhex('b90e532426d0dc20fffe01037048c018e940300038b165c211915c672e07762c') |
||||
|
|
||||
|
def enumerate_serial(self): |
||||
|
# Jade is not really an HID device, it shows as a serial/com port device. |
||||
|
# Scan com ports looking for the relevant vid and pid, and use 'path' to |
||||
|
# hold the path to the serial port device, eg. /dev/ttyUSB0 |
||||
|
devices = [] |
||||
|
for devinfo in list_ports.comports(): |
||||
|
device_product_key = (devinfo.vid, devinfo.pid) |
||||
|
if device_product_key in self.DEVICE_IDS: |
||||
|
device = Device(path=devinfo.device, |
||||
|
interface_number=-1, |
||||
|
id_=devinfo.serial_number, |
||||
|
product_key=device_product_key, |
||||
|
usage_page=-1, |
||||
|
transport_ui_string=devinfo.device) |
||||
|
devices.append(device) |
||||
|
|
||||
|
# Maybe look for Jade Qemu simulator if the vars are set (experimental) |
||||
|
if self.SIMULATOR_PATH is not None and self.SIMULATOR_TEST_SEED is not None: |
||||
|
try: |
||||
|
# If we can connect to a simulator and poke a seed in, add that too |
||||
|
client = Jade_Client(self.SIMULATOR_PATH, plugin=self) |
||||
|
device = Device(path=self.SIMULATOR_PATH, |
||||
|
interface_number=-1, |
||||
|
id_='Jade Qemu Simulator', |
||||
|
product_key=self.DEVICE_IDS[0], |
||||
|
usage_page=-1, |
||||
|
transport_ui_string='simulator') |
||||
|
if client.jade.set_seed(self.SIMULATOR_TEST_SEED): |
||||
|
devices.append(device) |
||||
|
client.close() |
||||
|
except Exception as e: |
||||
|
# If we get any sort of error do not add the simulator |
||||
|
_logger.debug("Failed to connect to Jade simulator at {}".format(self.SIMULATOR_PATH)) |
||||
|
_logger.debug(e) |
||||
|
|
||||
|
return devices |
||||
|
|
||||
|
def __init__(self, parent, config, name): |
||||
|
HW_PluginBase.__init__(self, parent, config, name) |
||||
|
|
||||
|
self.libraries_available = self.check_libraries_available() |
||||
|
if not self.libraries_available: |
||||
|
return |
||||
|
|
||||
|
# Register our own serial/com port scanning function |
||||
|
self.device_manager().register_enumerate_func(self.enumerate_serial) |
||||
|
|
||||
|
def get_library_version(self): |
||||
|
try: |
||||
|
from . import jadepy |
||||
|
version = jadepy.__version__ |
||||
|
except ImportError: |
||||
|
raise |
||||
|
except: |
||||
|
version = "unknown" |
||||
|
return version |
||||
|
|
||||
|
@runs_in_hwd_thread |
||||
|
def create_client(self, device, handler): |
||||
|
client = Jade_Client(device.path, plugin=self) |
||||
|
|
||||
|
# Check minimum supported firmware version |
||||
|
if self.MIN_SUPPORTED_FW_VERSION > client.fwversion: |
||||
|
msg = (_('Outdated {} firmware for device labelled {}. Please ' |
||||
|
'update using a Blockstream Green companion app') |
||||
|
.format(self.device, client.label())) |
||||
|
self.logger.info(msg) |
||||
|
|
||||
|
if handler: |
||||
|
handler.show_error(msg) |
||||
|
|
||||
|
raise OutdatedHwFirmwareException(msg) |
||||
|
|
||||
|
return client |
||||
|
|
||||
|
def setup_device(self, device_info, wizard, purpose): |
||||
|
device_id = device_info.device.id_ |
||||
|
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) |
||||
|
|
||||
|
# Call authenticate on hww to ensure unlocked and suitable for network |
||||
|
# May involve user entering PIN on (or even setting up!) hardware device |
||||
|
wizard.run_task_without_blocking_gui(task=lambda: client.authenticate()) |
||||
|
return client |
||||
|
|
||||
|
def get_xpub(self, device_id, derivation, xtype, wizard): |
||||
|
if xtype not in self.SUPPORTED_XTYPES: |
||||
|
raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) |
||||
|
client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) |
||||
|
xpub = client.get_xpub(derivation, xtype) |
||||
|
return xpub |
||||
|
|
||||
|
def show_address(self, wallet, address, keystore=None): |
||||
|
if keystore is None: |
||||
|
keystore = wallet.get_keystore() |
||||
|
if not self.show_address_helper(wallet, address, keystore): |
||||
|
return |
||||
|
|
||||
|
path_suffix = wallet.get_address_index(address) |
||||
|
if _is_multisig(wallet): |
||||
|
# Multisig - wallet details must be registered on Jade hw |
||||
|
multisig_name = _register_multisig_wallet(wallet, keystore, address) |
||||
|
|
||||
|
# Jade only needs the path suffix(es) and the multisig registration |
||||
|
# name to generate the address, as the fixed derivation part is |
||||
|
# embedded in the multisig wallet registration record |
||||
|
# NOTE: all cosigners have same path suffix |
||||
|
paths = [path_suffix] * wallet.n |
||||
|
hw_address = keystore.show_address_multi(multisig_name, paths) |
||||
|
else: |
||||
|
# Single-sig/standard |
||||
|
txin_type = wallet.get_txin_type(address) |
||||
|
hw_address = keystore.show_address(path_suffix, txin_type) |
||||
|
|
||||
|
if hw_address != address: |
||||
|
keystore.handler.show_error(_('The address generated by {} does not match!').format(self.device)) |
@ -0,0 +1,15 @@ |
|||||
|
# Python Jade Library |
||||
|
|
||||
|
This is a slightly modified version of the official [Jade](https://github.com/Blockstream/Jade) python library. |
||||
|
|
||||
|
This modified version was made from tag [0.1.32](https://github.com/Blockstream/Jade/releases/tag/0.1.32). |
||||
|
|
||||
|
Intention is to fold these modifications back into Jade repo, for future api release. |
||||
|
|
||||
|
## Changes |
||||
|
|
||||
|
- Removed BLE module, reducing transitive dependencies |
||||
|
- Comment create_ble() functions |
||||
|
- More robust 'read_cbor_respose()' function - backported from jade master |
||||
|
- Tweak jade_serial.py to unset RTS line - backported from jade master |
||||
|
- _http_request() function removed, so cannot be used as unintentional fallback |
@ -0,0 +1,4 @@ |
|||||
|
from .jade import JadeAPI |
||||
|
from .jade_error import JadeError |
||||
|
|
||||
|
__version__ = "0.0.1" |
@ -0,0 +1,666 @@ |
|||||
|
import cbor |
||||
|
import hashlib |
||||
|
import json |
||||
|
import time |
||||
|
import logging |
||||
|
import collections |
||||
|
import collections.abc |
||||
|
import traceback |
||||
|
import random |
||||
|
import sys |
||||
|
|
||||
|
|
||||
|
# JadeError |
||||
|
from .jade_error import JadeError |
||||
|
|
||||
|
# Low-level comms backends |
||||
|
from .jade_serial import JadeSerialImpl |
||||
|
from .jade_tcp import JadeTCPImpl |
||||
|
|
||||
|
# Not used in electrum wallet |
||||
|
# Removed to reduce transitive dependencies |
||||
|
# from .jade_ble import JadeBleImpl |
||||
|
|
||||
|
|
||||
|
# Default serial connection |
||||
|
DEFAULT_SERIAL_DEVICE = '/dev/ttyUSB0' |
||||
|
DEFAULT_BAUD_RATE = 115200 |
||||
|
DEFAULT_SERIAL_TIMEOUT = 120 |
||||
|
|
||||
|
# Default BLE connection |
||||
|
DEFAULT_BLE_DEVICE_NAME = 'Jade' |
||||
|
DEFAULT_BLE_SERIAL_NUMBER = None |
||||
|
DEFAULT_BLE_SCAN_TIMEOUT = 60 |
||||
|
|
||||
|
# 'jade' logger |
||||
|
logger = logging.getLogger('jade') |
||||
|
device_logger = logging.getLogger('jade-device') |
||||
|
|
||||
|
|
||||
|
# Helper to map bytes-like types into hex-strings |
||||
|
# to make for prettier message-logging |
||||
|
def _hexlify(data): |
||||
|
if data is None: |
||||
|
return None |
||||
|
elif isinstance(data, bytes) or isinstance(data, bytearray): |
||||
|
return data.hex() |
||||
|
elif isinstance(data, list): |
||||
|
return [_hexlify(item) for item in data] |
||||
|
elif isinstance(data, dict): |
||||
|
return {k: _hexlify(v) for k, v in data.items()} |
||||
|
else: |
||||
|
return data |
||||
|
|
||||
|
|
||||
|
# Simple http request function which can be used when a Jade response |
||||
|
# requires an external http call. |
||||
|
# The default implementation used in JadeAPI._jadeRpc() below. |
||||
|
# NOTE: Only available if the 'requests' dependency is available. |
||||
|
|
||||
|
# NOTE: Removed entirely for electrum - so it is not used silently as a fallback. |
||||
|
# (hard error preferred in that case) |
||||
|
# Jade repo api will be improved to make enabling this function more explicit |
||||
|
|
||||
|
# try: |
||||
|
# import requests |
||||
|
# |
||||
|
# def _http_request(params): |
||||
|
# logger.debug('_http_request: {}'.format(params)) |
||||
|
# |
||||
|
# # Use the first non-onion url |
||||
|
# url = [url for url in params['urls'] if not url.endswith('.onion')][0] |
||||
|
# if params['method'] == 'GET': |
||||
|
# assert 'data' not in params, 'Cannot pass body to requests.get' |
||||
|
# f = requests.get(url) |
||||
|
# elif params['method'] == 'POST': |
||||
|
# data = json.dumps(params['data']) |
||||
|
# f = requests.post(url, data) |
||||
|
# |
||||
|
# logger.debug("http_request received reply: {}".format(f.text)) |
||||
|
# |
||||
|
# if f.status_code != 200: |
||||
|
# logger.error("http error {} : {}".format(f.status_code, f.text)) |
||||
|
# raise ValueError(f.status_code) |
||||
|
# |
||||
|
# assert params['accept'] == 'json' |
||||
|
# f = f.json() |
||||
|
# |
||||
|
# return {'body': f} |
||||
|
# |
||||
|
# except ImportError as e: |
||||
|
# logger.warn(e) |
||||
|
# logger.warn('Default _http_requests() function will not be available') |
||||
|
# |
||||
|
|
||||
|
# |
||||
|
# High-Level Jade Client API |
||||
|
# Builds on a JadeInterface to provide a meaningful API |
||||
|
# |
||||
|
# Either: |
||||
|
# a) use with JadeAPI.create_[serial|ble]() as jade: |
||||
|
# (recommended) |
||||
|
# or: |
||||
|
# b) use JadeAPI.create_[serial|ble], then call connect() before |
||||
|
# using, and disconnect() when finished |
||||
|
# (caveat cranium) |
||||
|
# or: |
||||
|
# c) use ctor to wrap existing JadeInterface instance |
||||
|
# (caveat cranium) |
||||
|
# |
||||
|
class JadeAPI: |
||||
|
def __init__(self, jade): |
||||
|
assert jade is not None |
||||
|
self.jade = jade |
||||
|
|
||||
|
def __enter__(self): |
||||
|
self.connect() |
||||
|
return self |
||||
|
|
||||
|
def __exit__(self, exc_type, exc, tb): |
||||
|
if (exc_type): |
||||
|
logger.error("Exception causing JadeAPI context exit.") |
||||
|
logger.error(exc_type) |
||||
|
logger.error(exc) |
||||
|
traceback.print_tb(tb) |
||||
|
self.disconnect(exc_type is not None) |
||||
|
|
||||
|
@staticmethod |
||||
|
def create_serial(device=None, baud=None, timeout=None): |
||||
|
impl = JadeInterface.create_serial(device, baud, timeout) |
||||
|
return JadeAPI(impl) |
||||
|
|
||||
|
# @staticmethod |
||||
|
# def create_ble(device_name=None, serial_number=None, |
||||
|
# scan_timeout=None, loop=None): |
||||
|
# impl = JadeInterface.create_ble(device_name, serial_number, |
||||
|
# scan_timeout, loop) |
||||
|
# return JadeAPI(impl) |
||||
|
|
||||
|
# Connect underlying interface |
||||
|
def connect(self): |
||||
|
self.jade.connect() |
||||
|
|
||||
|
# Disconnect underlying interface |
||||
|
def disconnect(self, drain=False): |
||||
|
self.jade.disconnect(drain) |
||||
|
|
||||
|
# Drain all output from the interface |
||||
|
def drain(self): |
||||
|
self.jade.drain() |
||||
|
|
||||
|
# Raise any returned error as an exception |
||||
|
@staticmethod |
||||
|
def _get_result_or_raise_error(reply): |
||||
|
if 'error' in reply: |
||||
|
e = reply['error'] |
||||
|
raise JadeError(e.get('code'), e.get('message'), e.get('data')) |
||||
|
|
||||
|
return reply['result'] |
||||
|
|
||||
|
# Helper to call wrapper interface rpc invoker |
||||
|
def _jadeRpc(self, method, params=None, inputid=None, http_request_fn=None, long_timeout=False): |
||||
|
newid = inputid if inputid else str(random.randint(100000, 999999)) |
||||
|
request = self.jade.build_request(newid, method, params) |
||||
|
reply = self.jade.make_rpc_call(request, long_timeout) |
||||
|
result = self._get_result_or_raise_error(reply) |
||||
|
|
||||
|
# The Jade can respond with a request for interaction with a remote |
||||
|
# http server. This is used for interaction with the pinserver but the |
||||
|
# code below acts as a dumb proxy and simply makes the http request and |
||||
|
# forwards the response back to the Jade. |
||||
|
# Note: the function called to make the http-request can be passed in, |
||||
|
# or it can default to the simple _http_request() function above, if available. |
||||
|
if isinstance(result, collections.abc.Mapping) and 'http_request' in result: |
||||
|
this_module = sys.modules[__name__] |
||||
|
make_http_request = http_request_fn or getattr(this_module, '_http_request', None) |
||||
|
assert make_http_request, 'Default _http_request() function not available' |
||||
|
|
||||
|
http_request = result['http_request'] |
||||
|
http_response = make_http_request(http_request['params']) |
||||
|
return self._jadeRpc( |
||||
|
http_request['on-reply'], |
||||
|
http_response['body'], |
||||
|
http_request_fn=make_http_request, |
||||
|
long_timeout=long_timeout) |
||||
|
|
||||
|
return result |
||||
|
|
||||
|
# Get version information from the hw |
||||
|
def get_version_info(self): |
||||
|
return self._jadeRpc('get_version_info') |
||||
|
|
||||
|
# Add client entropy to the hw rng |
||||
|
def add_entropy(self, entropy): |
||||
|
params = {'entropy': entropy} |
||||
|
return self._jadeRpc('add_entropy', params) |
||||
|
|
||||
|
# OTA new firmware |
||||
|
def ota_update(self, fwcmp, fwlen, chunksize, cb): |
||||
|
|
||||
|
cmphasher = hashlib.sha256() |
||||
|
cmphasher.update(fwcmp) |
||||
|
cmphash = cmphasher.digest() |
||||
|
cmplen = len(fwcmp) |
||||
|
|
||||
|
# Initiate OTA |
||||
|
params = {'fwsize': fwlen, |
||||
|
'cmpsize': cmplen, |
||||
|
'cmphash': cmphash} |
||||
|
|
||||
|
result = self._jadeRpc('ota', params) |
||||
|
assert result is True |
||||
|
|
||||
|
# Write binary chunks |
||||
|
written = 0 |
||||
|
while written < cmplen: |
||||
|
remaining = cmplen - written |
||||
|
length = min(remaining, chunksize) |
||||
|
chunk = bytes(fwcmp[written:written + length]) |
||||
|
result = self._jadeRpc('ota_data', chunk) |
||||
|
assert result is True |
||||
|
written += length |
||||
|
|
||||
|
if (cb): |
||||
|
cb(written, cmplen) |
||||
|
|
||||
|
# All binary data uploaded |
||||
|
return self._jadeRpc('ota_complete') |
||||
|
|
||||
|
# Run (debug) healthcheck on the hw |
||||
|
def run_remote_selfcheck(self): |
||||
|
return self._jadeRpc('debug_selfcheck', long_timeout=True) |
||||
|
|
||||
|
# Set the (debug) mnemonic |
||||
|
def set_mnemonic(self, mnemonic, passphrase=None, temporary_wallet=False): |
||||
|
params = {'mnemonic': mnemonic, 'passphrase': passphrase, |
||||
|
'temporary_wallet': temporary_wallet} |
||||
|
return self._jadeRpc('debug_set_mnemonic', params) |
||||
|
|
||||
|
# Set the (debug) seed |
||||
|
def set_seed(self, seed, temporary_wallet=False): |
||||
|
params = {'seed': seed, 'temporary_wallet': temporary_wallet} |
||||
|
return self._jadeRpc('debug_set_mnemonic', params) |
||||
|
|
||||
|
# Override the pinserver details on the hww |
||||
|
def set_pinserver(self, urlA=None, urlB=None, pubkey=None, cert=None): |
||||
|
params = {} |
||||
|
if urlA is not None or urlB is not None: |
||||
|
params['urlA'] = urlA |
||||
|
params['urlB'] = urlB |
||||
|
if pubkey is not None: |
||||
|
params['pubkey'] = pubkey |
||||
|
if cert is not None: |
||||
|
params['certificate'] = cert |
||||
|
return self._jadeRpc('update_pinserver', params) |
||||
|
|
||||
|
# Reset the pinserver details on the hww to their defaults |
||||
|
def reset_pinserver(self, reset_details, reset_certificate): |
||||
|
params = {'reset_details': reset_details, |
||||
|
'reset_certificate': reset_certificate} |
||||
|
return self._jadeRpc('update_pinserver', params) |
||||
|
|
||||
|
# Trigger user authentication on the hw |
||||
|
# Involves pinserver handshake |
||||
|
def auth_user(self, network, http_request_fn=None): |
||||
|
params = {'network': network} |
||||
|
return self._jadeRpc('auth_user', params, |
||||
|
http_request_fn=http_request_fn, |
||||
|
long_timeout=True) |
||||
|
|
||||
|
# Get xpub given a path |
||||
|
def get_xpub(self, network, path): |
||||
|
params = {'network': network, 'path': path} |
||||
|
return self._jadeRpc('get_xpub', params) |
||||
|
|
||||
|
# Get registered multisig wallets |
||||
|
def get_registered_multisigs(self): |
||||
|
return self._jadeRpc('get_registered_multisigs') |
||||
|
|
||||
|
# Register a multisig wallet |
||||
|
def register_multisig(self, network, multisig_name, variant, sorted_keys, threshold, signers): |
||||
|
params = {'network': network, 'multisig_name': multisig_name, |
||||
|
'descriptor': {'variant': variant, 'sorted': sorted_keys, |
||||
|
'threshold': threshold, 'signers': signers}} |
||||
|
return self._jadeRpc('register_multisig', params) |
||||
|
|
||||
|
# Get receive-address for parameters |
||||
|
def get_receive_address(self, *args, recovery_xpub=None, csv_blocks=0, |
||||
|
variant=None, multisig_name=None): |
||||
|
if multisig_name is not None: |
||||
|
assert len(args) == 2 |
||||
|
keys = ['network', 'paths', 'multisig_name'] |
||||
|
args += (multisig_name,) |
||||
|
elif variant is not None: |
||||
|
assert len(args) == 2 |
||||
|
keys = ['network', 'path', 'variant'] |
||||
|
args += (variant,) |
||||
|
else: |
||||
|
assert len(args) == 4 |
||||
|
keys = ['network', 'subaccount', 'branch', 'pointer', 'recovery_xpub', 'csv_blocks'] |
||||
|
args += (recovery_xpub, csv_blocks) |
||||
|
return self._jadeRpc('get_receive_address', dict(zip(keys, args))) |
||||
|
|
||||
|
# Sign a message |
||||
|
def sign_message(self, path, message, use_ae_signatures=False, |
||||
|
ae_host_commitment=None, ae_host_entropy=None): |
||||
|
if use_ae_signatures: |
||||
|
# Anti-exfil protocol: |
||||
|
# We send the signing request and receive the signer-commitment in |
||||
|
# reply once the user confirms. |
||||
|
# We can then request the actual signature passing the ae-entropy. |
||||
|
params = {'path': path, 'message': message, 'ae_host_commitment': ae_host_commitment} |
||||
|
signer_commitment = self._jadeRpc('sign_message', params) |
||||
|
params = {'ae_host_entropy': ae_host_entropy} |
||||
|
signature = self._jadeRpc('get_signature', params) |
||||
|
return signer_commitment, signature |
||||
|
else: |
||||
|
# Standard EC signature, simple case |
||||
|
params = {'path': path, 'message': message} |
||||
|
return self._jadeRpc('sign_message', params) |
||||
|
|
||||
|
# Get a Liquid master blinding key |
||||
|
def get_master_blinding_key(self): |
||||
|
return self._jadeRpc('get_master_blinding_key') |
||||
|
|
||||
|
# Get a Liquid public blinding key for a given script |
||||
|
def get_blinding_key(self, script): |
||||
|
params = {'script': script} |
||||
|
return self._jadeRpc('get_blinding_key', params) |
||||
|
|
||||
|
# Get the shared secret to unblind a tx, given the receiving script on |
||||
|
# our side and the pubkey of the sender (sometimes called "nonce" in |
||||
|
# Liquid). Optionally fetch our blinding pubkey also. |
||||
|
def get_shared_nonce(self, script, their_pubkey, include_pubkey=False): |
||||
|
params = {'script': script, 'their_pubkey': their_pubkey, 'include_pubkey': include_pubkey} |
||||
|
return self._jadeRpc('get_shared_nonce', params) |
||||
|
|
||||
|
# Get a "trusted" blinding factor to blind an output. Normally the blinding |
||||
|
# factors are generated and returned in the `get_commitments` call, but |
||||
|
# for the last output the VBF must be generated on the host side, so this |
||||
|
# call allows the host to get a valid ABF to compute the generator and |
||||
|
# then the "final" VBF. Nonetheless, this call is kept generic, and can |
||||
|
# also generate VBFs, thus the "type" parameter. |
||||
|
# `hash_prevouts` is computed as specified in BIP143 (double SHA of all |
||||
|
# the outpoints being spent as input. It's not checked right away since |
||||
|
# at this point Jade doesn't know anything about the tx we are referring |
||||
|
# to. It will be checked later during `sign_liquid_tx`. |
||||
|
# `output_index` is the output we are trying to blind. |
||||
|
# `type` can either be "ASSET" or "VALUE" to generate ABFs or VBFs. |
||||
|
def get_blinding_factor(self, hash_prevouts, output_index, type): |
||||
|
params = {'hash_prevouts': hash_prevouts, |
||||
|
'output_index': output_index, |
||||
|
'type': type} |
||||
|
return self._jadeRpc('get_blinding_factor', params) |
||||
|
|
||||
|
# Generate the blinding factors and commitments for a given output. |
||||
|
# Can optionally get a "custom" VBF, normally used for the last |
||||
|
# input where the VBF is not random, but generated accordingly to |
||||
|
# all the others. |
||||
|
# `hash_prevouts` and `output_index` have the same meaning as in |
||||
|
# the `get_blinding_factor` call. |
||||
|
# NOTE: the `asset_id` should be passed as it is normally displayed, so |
||||
|
# reversed compared to the "consensus" representation. |
||||
|
def get_commitments(self, |
||||
|
asset_id, |
||||
|
value, |
||||
|
hash_prevouts, |
||||
|
output_index, |
||||
|
vbf=None): |
||||
|
params = {'asset_id': asset_id, |
||||
|
'value': value, |
||||
|
'hash_prevouts': hash_prevouts, |
||||
|
'output_index': output_index} |
||||
|
if vbf is not None: |
||||
|
params['vbf'] = vbf |
||||
|
return self._jadeRpc('get_commitments', params) |
||||
|
|
||||
|
# Common code for sending btc- and liquid- tx-inputs and receiving the |
||||
|
# signatures. Handles standard EC and AE signing schemes. |
||||
|
def _send_tx_inputs(self, base_id, inputs, use_ae_signatures): |
||||
|
if use_ae_signatures: |
||||
|
# Anti-exfil protocol: |
||||
|
# We send one message per input (which includes host-commitment *but |
||||
|
# not* the host entropy) and receive the signer-commitment in reply. |
||||
|
# Once all n input messages are sent, we can request the actual signatures |
||||
|
# (as the user has a chance to confirm/cancel at this point). |
||||
|
# We request the signatures passing the ae-entropy for each one. |
||||
|
# Send inputs one at a time, receiving 'signer-commitment' in reply |
||||
|
signer_commitments = [] |
||||
|
host_ae_entropy_values = [] |
||||
|
for txinput in inputs: |
||||
|
# ae-protocol - do not send the host entropy immediately |
||||
|
txinput = txinput.copy() # shallow copy |
||||
|
host_ae_entropy_values.append(txinput.pop('ae_host_entropy', None)) |
||||
|
|
||||
|
base_id += 1 |
||||
|
input_id = str(base_id) |
||||
|
reply = self._jadeRpc('tx_input', txinput, input_id) |
||||
|
signer_commitments.append(reply) |
||||
|
|
||||
|
# Request the signatures one at a time, sending the entropy |
||||
|
signatures = [] |
||||
|
for (i, host_ae_entropy) in enumerate(host_ae_entropy_values, 1): |
||||
|
base_id += 1 |
||||
|
sig_id = str(base_id) |
||||
|
params = {'ae_host_entropy': host_ae_entropy} |
||||
|
reply = self._jadeRpc('get_signature', params, sig_id) |
||||
|
signatures.append(reply) |
||||
|
|
||||
|
assert len(signatures) == len(inputs) |
||||
|
return list(zip(signer_commitments, signatures)) |
||||
|
else: |
||||
|
# Legacy protocol: |
||||
|
# We send one message per input - without expecting replies. |
||||
|
# Once all n input messages are sent, the hw then sends all n replies |
||||
|
# (as the user has a chance to confirm/cancel at this point). |
||||
|
# Then receive all n replies for the n signatures. |
||||
|
# NOTE: *NOT* a sequence of n blocking rpc calls. |
||||
|
# NOTE: at some point this flow should be removed in favour of the one |
||||
|
# above, albeit without passing anti-exfil entropy or commitment data. |
||||
|
|
||||
|
# Send all n inputs |
||||
|
requests = [] |
||||
|
for txinput in inputs: |
||||
|
base_id += 1 |
||||
|
msg_id = str(base_id) |
||||
|
request = self.jade.build_request(msg_id, 'tx_input', txinput) |
||||
|
self.jade.write_request(request) |
||||
|
requests.append(request) |
||||
|
time.sleep(0.1) |
||||
|
|
||||
|
# Receive all n signatures |
||||
|
signatures = [] |
||||
|
for request in requests: |
||||
|
reply = self.jade.read_response() |
||||
|
self.jade.validate_reply(request, reply) |
||||
|
signature = self._get_result_or_raise_error(reply) |
||||
|
signatures.append(signature) |
||||
|
|
||||
|
assert len(signatures) == len(inputs) |
||||
|
return signatures |
||||
|
|
||||
|
# Sign a Liquid txn |
||||
|
def sign_liquid_tx(self, network, txn, inputs, commitments, change, use_ae_signatures=False): |
||||
|
# 1st message contains txn and number of inputs we are going to send. |
||||
|
# Reply ok if that corresponds to the expected number of inputs (n). |
||||
|
base_id = 100 * random.randint(1000, 9999) |
||||
|
params = {'network': network, |
||||
|
'txn': txn, |
||||
|
'num_inputs': len(inputs), |
||||
|
'trusted_commitments': commitments, |
||||
|
'use_ae_signatures': use_ae_signatures, |
||||
|
'change': change} |
||||
|
|
||||
|
reply = self._jadeRpc('sign_liquid_tx', params, str(base_id)) |
||||
|
assert reply |
||||
|
|
||||
|
# Send inputs and receive signatures |
||||
|
return self._send_tx_inputs(base_id, inputs, use_ae_signatures) |
||||
|
|
||||
|
# Sign a txn |
||||
|
def sign_tx(self, network, txn, inputs, change, use_ae_signatures=False): |
||||
|
# 1st message contains txn and number of inputs we are going to send. |
||||
|
# Reply ok if that corresponds to the expected number of inputs (n). |
||||
|
base_id = 100 * random.randint(1000, 9999) |
||||
|
params = {'network': network, |
||||
|
'txn': txn, |
||||
|
'num_inputs': len(inputs), |
||||
|
'use_ae_signatures': use_ae_signatures, |
||||
|
'change': change} |
||||
|
|
||||
|
reply = self._jadeRpc('sign_tx', params, str(base_id)) |
||||
|
assert reply |
||||
|
|
||||
|
# Send inputs and receive signatures |
||||
|
return self._send_tx_inputs(base_id, inputs, use_ae_signatures) |
||||
|
|
||||
|
|
||||
|
# |
||||
|
# Mid-level interface to Jade |
||||
|
# Wraps either a serial or a ble connection |
||||
|
# Calls to send and receive bytes and cbor messages over the interface. |
||||
|
# |
||||
|
# Either: |
||||
|
# a) use wrapped with JadeAPI |
||||
|
# (recommended) |
||||
|
# or: |
||||
|
# b) use with JadeInterface.create_[serial|ble]() as jade: |
||||
|
# ... |
||||
|
# or: |
||||
|
# c) use JadeInterface.create_[serial|ble], then call connect() before |
||||
|
# using, and disconnect() when finished |
||||
|
# (caveat cranium) |
||||
|
# or: |
||||
|
# d) use ctor to wrap existing low-level implementation instance |
||||
|
# (caveat cranium) |
||||
|
# |
||||
|
class JadeInterface: |
||||
|
def __init__(self, impl): |
||||
|
assert impl is not None |
||||
|
self.impl = impl |
||||
|
|
||||
|
def __enter__(self): |
||||
|
self.connect() |
||||
|
return self |
||||
|
|
||||
|
def __exit__(self, exc_type, exc, tb): |
||||
|
if (exc_type): |
||||
|
logger.error("Exception causing JadeInterface context exit.") |
||||
|
logger.error(exc_type) |
||||
|
logger.error(exc) |
||||
|
traceback.print_tb(tb) |
||||
|
self.disconnect(exc_type is not None) |
||||
|
|
||||
|
@staticmethod |
||||
|
def create_serial(device=None, baud=None, timeout=None): |
||||
|
if device and JadeTCPImpl.isSupportedDevice(device): |
||||
|
impl = JadeTCPImpl(device) |
||||
|
else: |
||||
|
impl = JadeSerialImpl(device or DEFAULT_SERIAL_DEVICE, |
||||
|
baud or DEFAULT_BAUD_RATE, |
||||
|
timeout or DEFAULT_SERIAL_TIMEOUT) |
||||
|
return JadeInterface(impl) |
||||
|
|
||||
|
# @staticmethod |
||||
|
# def create_ble(device_name=None, serial_number=None, |
||||
|
# scan_timeout=None, loop=None): |
||||
|
# impl = JadeBleImpl(device_name or DEFAULT_BLE_DEVICE_NAME, |
||||
|
# serial_number or DEFAULT_BLE_SERIAL_NUMBER, |
||||
|
# scan_timeout or DEFAULT_BLE_SCAN_TIMEOUT, |
||||
|
# loop=loop) |
||||
|
# return JadeInterface(impl) |
||||
|
|
||||
|
def connect(self): |
||||
|
self.impl.connect() |
||||
|
|
||||
|
def disconnect(self, drain=False): |
||||
|
if drain: |
||||
|
self.drain() |
||||
|
|
||||
|
self.impl.disconnect() |
||||
|
|
||||
|
def drain(self): |
||||
|
logger.warn("Draining interface...") |
||||
|
drained = bytearray() |
||||
|
finished = False |
||||
|
|
||||
|
while not finished: |
||||
|
byte_ = self.impl.read(1) |
||||
|
drained.extend(byte_) |
||||
|
finished = byte_ == b'' |
||||
|
|
||||
|
if finished or byte_ == b'\n' or len(drained) > 256: |
||||
|
try: |
||||
|
device_logger.warn(drained.decode('utf-8')) |
||||
|
except Exception as e: |
||||
|
# Dump the bytes raw and as hex if decoding as utf-8 failed |
||||
|
device_logger.warn("Raw:") |
||||
|
device_logger.warn(drained) |
||||
|
device_logger.warn("----") |
||||
|
device_logger.warn("Hex dump:") |
||||
|
device_logger.warn(drained.hex()) |
||||
|
|
||||
|
# Clear and loop to continue collecting |
||||
|
drained.clear() |
||||
|
|
||||
|
@staticmethod |
||||
|
def build_request(input_id, method, params=None): |
||||
|
request = {"method": method, "id": input_id} |
||||
|
if params is not None: |
||||
|
request["params"] = params |
||||
|
return request |
||||
|
|
||||
|
@staticmethod |
||||
|
def serialise_cbor_request(request): |
||||
|
dump = cbor.dumps(request) |
||||
|
len_dump = len(dump) |
||||
|
if 'method' in request and 'ota_data' in request['method']: |
||||
|
msg = 'Sending ota_data message {} as cbor of size {}'.format(request['id'], len_dump) |
||||
|
logger.info(msg) |
||||
|
else: |
||||
|
logger.info('Sending: {} as cbor of size {}'.format(_hexlify(request), len_dump)) |
||||
|
return dump |
||||
|
|
||||
|
def write(self, bytes_): |
||||
|
logger.debug("Sending: {} bytes".format(len(bytes_))) |
||||
|
wrote = self.impl.write(bytes_) |
||||
|
logger.debug("Sent: {} bytes".format(len(bytes_))) |
||||
|
return wrote |
||||
|
|
||||
|
def write_request(self, request): |
||||
|
msg = self.serialise_cbor_request(request) |
||||
|
written = 0 |
||||
|
while written < len(msg): |
||||
|
written += self.write(msg[written:]) |
||||
|
|
||||
|
def read(self, n): |
||||
|
logger.debug("Reading {} bytes...".format(n)) |
||||
|
bytes_ = self.impl.read(n) |
||||
|
logger.debug("Received: {} bytes".format(len(bytes_))) |
||||
|
return bytes_ |
||||
|
|
||||
|
def read_cbor_message(self): |
||||
|
while True: |
||||
|
# 'self' is sufficiently 'file-like' to act as a load source. |
||||
|
# Throws EOFError on end of stream/timeout/lost-connection etc. |
||||
|
message = cbor.load(self) |
||||
|
|
||||
|
if isinstance(message, collections.abc.Mapping): |
||||
|
# A message response (to a prior request) |
||||
|
if 'id' in message: |
||||
|
logger.info("Received msg: {}".format(_hexlify(message))) |
||||
|
return message |
||||
|
|
||||
|
# A log message - handle as normal |
||||
|
if 'log' in message: |
||||
|
response = message['log'] |
||||
|
log_method = device_logger.error |
||||
|
try: |
||||
|
response = message['log'].decode("utf-8") |
||||
|
log_methods = { |
||||
|
'E': device_logger.error, |
||||
|
'W': device_logger.warn, |
||||
|
'I': device_logger.info, |
||||
|
'D': device_logger.debug, |
||||
|
'V': device_logger.debug, |
||||
|
} |
||||
|
if len(response) > 1 and response[1] == ' ': |
||||
|
lvl = response[0] |
||||
|
log_method = log_methods.get(lvl, device_logger.error) |
||||
|
except Exception as e: |
||||
|
logger.error('Error processing log message: {}'.format(e)) |
||||
|
log_method('>> {}'.format(response)) |
||||
|
continue |
||||
|
|
||||
|
# Unknown/unhandled/unexpected message |
||||
|
logger.error("Unhandled message received") |
||||
|
device_logger.error(message) |
||||
|
|
||||
|
def read_response(self, long_timeout=False): |
||||
|
while True: |
||||
|
try: |
||||
|
return self.read_cbor_message() |
||||
|
except EOFError as e: |
||||
|
if not long_timeout: |
||||
|
raise |
||||
|
|
||||
|
@staticmethod |
||||
|
def validate_reply(request, reply): |
||||
|
assert isinstance(reply, dict) and 'id' in reply |
||||
|
assert ('result' in reply) != ('error' in reply) |
||||
|
assert reply['id'] == request['id'] or \ |
||||
|
reply['id'] == '00' and 'error' in reply |
||||
|
|
||||
|
def make_rpc_call(self, request, long_timeout=False): |
||||
|
# Write outgoing request message |
||||
|
assert isinstance(request, dict) |
||||
|
assert 'id' in request and len(request['id']) > 0 |
||||
|
assert 'method' in request and len(request['method']) > 0 |
||||
|
assert len(request['id']) < 16 and len(request['method']) < 32 |
||||
|
self.write_request(request) |
||||
|
|
||||
|
# Read and validate incoming message |
||||
|
reply = self.read_response(long_timeout) |
||||
|
self.validate_reply(request, reply) |
||||
|
|
||||
|
return reply |
@ -0,0 +1,24 @@ |
|||||
|
class JadeError(Exception): |
||||
|
# RPC error codes |
||||
|
INVALID_REQUEST = -32600 |
||||
|
UNKNOWN_METHOD = -32601 |
||||
|
BAD_PARAMETERS = -32602 |
||||
|
INTERNAL_ERROR = -32603 |
||||
|
|
||||
|
# Implementation specific error codes: -32000 to -32099 |
||||
|
USER_CANCELLED = -32000 |
||||
|
PROTOCOL_ERROR = -32001 |
||||
|
HW_LOCKED = -32002 |
||||
|
NETWORK_MISMATCH = -32003 |
||||
|
|
||||
|
def __init__(self, code, message, data): |
||||
|
self.code = code |
||||
|
self.message = message |
||||
|
self.data = data |
||||
|
|
||||
|
def __repr__(self): |
||||
|
return "JadeError: " + str(self.code) + " - " + self.message \ |
||||
|
+ " (Data: " + repr(self.data) + ")" |
||||
|
|
||||
|
def __str__(self): |
||||
|
return repr(self) |
@ -0,0 +1,64 @@ |
|||||
|
import serial |
||||
|
import logging |
||||
|
|
||||
|
|
||||
|
logger = logging.getLogger('jade.serial') |
||||
|
|
||||
|
|
||||
|
# |
||||
|
# Low-level Serial backend interface to Jade |
||||
|
# Calls to send and receive bytes over the interface. |
||||
|
# Intended for use via JadeInterface wrapper. |
||||
|
# |
||||
|
# Either: |
||||
|
# a) use via JadeInterface.create_serial() (see JadeInterface) |
||||
|
# (recommended) |
||||
|
# or: |
||||
|
# b) use JadeSerialImpl() directly, and call connect() before |
||||
|
# using, and disconnect() when finished, |
||||
|
# (caveat cranium) |
||||
|
# |
||||
|
class JadeSerialImpl: |
||||
|
def __init__(self, device, baud, timeout): |
||||
|
self.device = device |
||||
|
self.baud = baud |
||||
|
self.timeout = timeout |
||||
|
self.ser = None |
||||
|
|
||||
|
def connect(self): |
||||
|
assert self.ser is None |
||||
|
|
||||
|
logger.info('Connecting to {} at {}'.format(self.device, self.baud)) |
||||
|
self.ser = serial.Serial(self.device, self.baud, |
||||
|
timeout=self.timeout, |
||||
|
write_timeout=self.timeout) |
||||
|
assert self.ser is not None |
||||
|
|
||||
|
if not self.ser.is_open: |
||||
|
self.ser.open() |
||||
|
|
||||
|
# Ensure RTS and DTR are not set (as this can cause the hw to reboot) |
||||
|
self.ser.setRTS(False) |
||||
|
self.ser.setDTR(False) |
||||
|
|
||||
|
logger.info('Connected') |
||||
|
|
||||
|
def disconnect(self): |
||||
|
assert self.ser is not None |
||||
|
|
||||
|
# Ensure RTS and DTR are not set (as this can cause the hw to reboot) |
||||
|
# and then close the connection |
||||
|
self.ser.setRTS(False) |
||||
|
self.ser.setDTR(False) |
||||
|
self.ser.close() |
||||
|
|
||||
|
# Reset state |
||||
|
self.ser = None |
||||
|
|
||||
|
def write(self, bytes_): |
||||
|
assert self.ser is not None |
||||
|
return self.ser.write(bytes_) |
||||
|
|
||||
|
def read(self, n): |
||||
|
assert self.ser is not None |
||||
|
return self.ser.read(n) |
@ -0,0 +1,60 @@ |
|||||
|
import socket |
||||
|
import logging |
||||
|
|
||||
|
|
||||
|
logger = logging.getLogger('jade.tcp') |
||||
|
|
||||
|
|
||||
|
# |
||||
|
# Low-level Serial-via-TCP backend interface to Jade |
||||
|
# Calls to send and receive bytes over the interface. |
||||
|
# Intended for use via JadeInterface wrapper. |
||||
|
# |
||||
|
# Either: |
||||
|
# a) use via JadeInterface.create_serial() (see JadeInterface) |
||||
|
# (recommended) |
||||
|
# or: |
||||
|
# b) use JadeTCPImpl() directly, and call connect() before |
||||
|
# using, and disconnect() when finished, |
||||
|
# (caveat cranium) |
||||
|
# |
||||
|
class JadeTCPImpl: |
||||
|
PROTOCOL_PREFIX = 'tcp:' |
||||
|
|
||||
|
@classmethod |
||||
|
def isSupportedDevice(cls, device): |
||||
|
return device is not None and device.startswith(cls.PROTOCOL_PREFIX) |
||||
|
|
||||
|
def __init__(self, device): |
||||
|
assert self.isSupportedDevice(device) |
||||
|
self.device = device |
||||
|
self.tcp_sock = None |
||||
|
|
||||
|
def connect(self): |
||||
|
assert self.isSupportedDevice(self.device) |
||||
|
assert self.tcp_sock is None |
||||
|
|
||||
|
logger.info('Connecting to {}'.format(self.device)) |
||||
|
self.tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
||||
|
|
||||
|
url = self.device[len(self.PROTOCOL_PREFIX):].split(':') |
||||
|
self.tcp_sock.connect((url[0], int(url[1]))) |
||||
|
assert self.tcp_sock is not None |
||||
|
|
||||
|
self.tcp_sock.__enter__() |
||||
|
logger.info('Connected') |
||||
|
|
||||
|
def disconnect(self): |
||||
|
assert self.tcp_sock is not None |
||||
|
self.tcp_sock.__exit__() |
||||
|
|
||||
|
# Reset state |
||||
|
self.tcp_sock = None |
||||
|
|
||||
|
def write(self, bytes_): |
||||
|
assert self.tcp_sock is not None |
||||
|
return self.tcp_sock.send(bytes_) |
||||
|
|
||||
|
def read(self, n): |
||||
|
assert self.tcp_sock is not None |
||||
|
return self.tcp_sock.recv(n) |
@ -0,0 +1,47 @@ |
|||||
|
from functools import partial |
||||
|
|
||||
|
from PyQt5.QtCore import pyqtSignal |
||||
|
from PyQt5.QtWidgets import QLabel, QVBoxLayout |
||||
|
|
||||
|
from electrum.i18n import _ |
||||
|
from electrum.plugin import hook |
||||
|
from electrum.wallet import Standard_Wallet |
||||
|
from electrum.gui.qt.util import WindowModalDialog |
||||
|
|
||||
|
from .jade import JadePlugin |
||||
|
from ..hw_wallet.qt import QtHandlerBase, QtPluginBase |
||||
|
from ..hw_wallet.plugin import only_hook_if_libraries_available |
||||
|
|
||||
|
|
||||
|
class Plugin(JadePlugin, QtPluginBase): |
||||
|
icon_unpaired = "jade_unpaired.png" |
||||
|
icon_paired = "jade.png" |
||||
|
|
||||
|
def create_handler(self, window): |
||||
|
return Jade_Handler(window) |
||||
|
|
||||
|
@only_hook_if_libraries_available |
||||
|
@hook |
||||
|
def receive_menu(self, menu, addrs, wallet): |
||||
|
if type(wallet) is not Standard_Wallet: |
||||
|
return |
||||
|
keystore = wallet.get_keystore() |
||||
|
if type(keystore) == self.keystore_class and len(addrs) == 1: |
||||
|
def show_address(): |
||||
|
keystore.thread.add(partial(self.show_address, wallet, addrs[0])) |
||||
|
menu.addAction(_("Show on Jade"), show_address) |
||||
|
|
||||
|
class Jade_Handler(QtHandlerBase): |
||||
|
setup_signal = pyqtSignal() |
||||
|
auth_signal = pyqtSignal(object, object) |
||||
|
|
||||
|
def __init__(self, win): |
||||
|
super(Jade_Handler, self).__init__(win, 'Jade') |
||||
|
|
||||
|
def message_dialog(self, msg): |
||||
|
self.clear_dialog() |
||||
|
self.dialog = dialog = WindowModalDialog(self.top_level_window(), _("Jade Status")) |
||||
|
l = QLabel(msg) |
||||
|
vbox = QVBoxLayout(dialog) |
||||
|
vbox.addWidget(l) |
||||
|
dialog.show() |
Loading…
Reference in new issue