diff --git a/contrib/requirements/requirements-hw.txt b/contrib/requirements/requirements-hw.txt index 7343f21af..bf7715615 100644 --- a/contrib/requirements/requirements-hw.txt +++ b/contrib/requirements/requirements-hw.txt @@ -6,7 +6,8 @@ hidapi<0.11 trezor[hidapi]>=0.13.0,<0.14 safet>=0.1.5 keepkey>=6.3.1 -btchip-python>=0.1.32 +btchip-python>=0.1.32 # used for legacy Ledger Bitcoin app up to versions 2.0.* or below +ledger-bitcoin=0.1.1 # used for new Ledger Bitcoin app >= v2.1.0 and above ckcc-protocol>=0.7.7 bitbox02>=6.0.0 cbor>=1.0.0,<2.0.0 diff --git a/electrum/plugins/ledger/__init__.py b/electrum/plugins/ledger/__init__.py index b97f02335..f308379f5 100644 --- a/electrum/plugins/ledger/__init__.py +++ b/electrum/plugins/ledger/__init__.py @@ -2,6 +2,6 @@ from electrum.i18n import _ fullname = 'Ledger Wallet' description = 'Provides support for Ledger hardware wallet' -requires = [('btchip', 'github.com/ledgerhq/btchip-python')] +requires = [('btchip', 'github.com/ledgerhq/btchip-python'), ('ledger_bitcoin', 'github.com/LedgerHQ/app-bitcoin-new')] registers_keystore = ('hardware', 'ledger', _("Ledger wallet")) available_for = ['qt', 'cmdline'] diff --git a/electrum/plugins/ledger/ledger.py b/electrum/plugins/ledger/ledger.py index cb0a1c2ab..35e3f172d 100644 --- a/electrum/plugins/ledger/ledger.py +++ b/electrum/plugins/ledger/ledger.py @@ -1,37 +1,42 @@ -from struct import pack, unpack +# Some parts of this code are adapted from bitcoin-core/HWI: +# https://github.com/bitcoin-core/HWI/blob/e731395bde13362950e9f13e01689c475545e4dc/hwilib/devices/ledger.py + +from abc import ABC, abstractmethod +import base64 import hashlib -import sys -import traceback -from typing import Optional, Tuple +from typing import Dict, List, Optional, Sequence, Tuple -from electrum import ecc -from electrum import bip32 -from electrum.crypto import hash_160 -from electrum.bitcoin import int_to_hex, var_int, is_segwit_script_type, is_b58_address +import ledger_bitcoin +from ledger_bitcoin import WalletPolicy, MultisigWallet, AddressType, Chain +from ledger_bitcoin.exception.errors import DenyError, NotSupportedError, SecurityStatusNotSatisfiedError +from ledger_bitcoin.key import KeyOriginInfo +from ledgercomm.interfaces.hid_device import HID + +from electrum import bip32, constants, ecc +from electrum.base_wizard import ScriptTypeNotSupported from electrum.bip32 import BIP32Node, convert_bip32_intpath_to_strpath +from electrum.bitcoin import EncodeBase58Check, int_to_hex, is_b58_address, is_segwit_script_type, var_int +from electrum.crypto import hash_160 from electrum.i18n import _ -from electrum.keystore import Hardware_KeyStore -from electrum.transaction import Transaction, PartialTransaction, PartialTxInput, PartialTxOutput -from electrum.wallet import Standard_Wallet -from electrum.util import bfh, bh2u, versiontuple, UserFacingException -from electrum.base_wizard import ScriptTypeNotSupported +from electrum.keystore import AddressIndexGeneric, Hardware_KeyStore from electrum.logging import get_logger -from electrum.plugin import runs_in_hwd_thread, Device - -from ..hw_wallet import HW_PluginBase, HardwareClientBase -from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output, LibraryFoundButUnusable +from electrum.plugin import Device, runs_in_hwd_thread +from electrum.transaction import PartialTransaction, Transaction +from electrum.util import bfh, UserFacingException, versiontuple +from electrum.wallet import Standard_Wallet +from ..hw_wallet import HardwareClientBase, HW_PluginBase +from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, validate_op_return_output _logger = get_logger(__name__) - try: import hid - from btchip.btchipComm import HIDDongleHIDAPI, DongleWait + from btchip.btchipComm import HIDDongleHIDAPI from btchip.btchip import btchip - from btchip.btchipUtils import compress_public_key,format_transaction, get_regular_input_script, get_p2sh_input_script + from btchip.btchipUtils import compress_public_key from btchip.bitcoinTransaction import bitcoinTransaction - from btchip.btchipFirmwareWizard import checkFirmware, updateFirmware + from btchip.btchipFirmwareWizard import checkFirmware from btchip.btchipException import BTChipException BTCHIP = True BTCHIP_DEBUG = False @@ -41,15 +46,78 @@ except ImportError as e: BTCHIP = False MSG_NEEDS_FW_UPDATE_GENERIC = _('Firmware version too old. Please update at') + \ - ' https://www.ledgerwallet.com' + ' https://www.ledgerwallet.com' MSG_NEEDS_FW_UPDATE_SEGWIT = _('Firmware version (or "Bitcoin" app) too old for Segwit support. Please update at') + \ - ' https://www.ledgerwallet.com' + ' https://www.ledgerwallet.com' MULTI_OUTPUT_SUPPORT = '1.1.4' SEGWIT_SUPPORT = '1.1.10' SEGWIT_SUPPORT_SPECIAL = '1.0.4' SEGWIT_TRUSTEDINPUTS = '1.4.0' +def is_policy_standard(wp: 'WalletPolicy', fpr: bytes, exp_coin_type: int) -> bool: + """Returns True if the wallet policy can be used without registration.""" + + if wp.name != "" or wp.n_keys != 1: + return False + + key_info = wp.keys_info[0] + + if key_info[0] != '[': + # no key origin info + return False + + try: + key_orig_end = key_info.index(']') + except ValueError: + # invalid key_info + return False + + key_fpr, key_path = key_info[1:key_orig_end].split('/', maxsplit=1) + + if key_fpr != fpr.hex(): + # not an internal key + return False + + key_path_parts = key_path.split('/') + + # Account key should be exactly 3 hardened derivation steps + if len(key_path_parts) != 3 or any(part[-1] != "'" for part in key_path_parts): + return False + + purpose, coin_type, account_index = key_path_parts + + if coin_type != f"{exp_coin_type}'" or int(account_index[:-1]) > 100: + return False + + if wp.descriptor_template == "pkh(@0/**)": + # BIP-44 + return purpose == "44'" + elif wp.descriptor_template == "sh(wpkh(@0/**))": + # BIP-49, nested SegWit + return purpose == "49'" + elif wp.descriptor_template == "wpkh(@0/**)": + # BIP-84, native SegWit + return purpose == "84'" + elif wp.descriptor_template == "tr(@0/**)": + # BIP-86, taproot single key + return purpose == "86'" + else: + # unknown + return False + + +def convert_xpub(xpub: str, xtype='standard') -> str: + bip32node = BIP32Node.from_xkey(xpub) + return BIP32Node( + xtype=xtype, + eckey=bip32node.eckey, + chaincode=bip32node.chaincode, + depth=bip32node.depth, + fingerprint=bip32node.fingerprint, + child_number=bip32node.child_number).to_xpub() + + def test_pin_unlocked(func): """Function decorator to test the Ledger for being unlocked, and if not, raise a human-readable exception. @@ -57,19 +125,233 @@ def test_pin_unlocked(func): def catch_exception(self, *args, **kwargs): try: return func(self, *args, **kwargs) - except BTChipException as e: - if e.sw == 0x6982: - raise UserFacingException(_('Your Ledger is locked. Please unlock it.')) - else: - raise + except SecurityStatusNotSatisfiedError: + raise UserFacingException(_('Your Ledger is locked. Please unlock it.')) return catch_exception -class Ledger_Client(HardwareClientBase): +# from HWI +def is_witness(script: bytes) -> Tuple[bool, int, bytes]: + """ + Determine whether a script is a segwit output script. + If so, also returns the witness version and witness program. + + :param script: The script + :returns: A tuple of a bool indicating whether the script is a segwit output script, + an int representing the witness version, + and the bytes of the witness program. + """ + if len(script) < 4 or len(script) > 42: + return (False, 0, b"") + + if script[0] != 0 and (script[0] < 81 or script[0] > 96): + return (False, 0, b"") + + if script[1] + 2 == len(script): + return (True, script[0] - 0x50 if script[0] else 0, script[2:]) + + return (False, 0, b"") + + +# from HWI +# Only handles up to 15 of 15. Returns None if this script is not a +# multisig script. Returns (m, pubkeys) otherwise. +def parse_multisig(script: bytes) -> Optional[Tuple[int, Sequence[bytes]]]: + """ + Determine whether a script is a multisig script. If so, determine the parameters of that multisig. + + :param script: The script + :returns: ``None`` if the script is not multisig. + If multisig, returns a tuple of the number of signers required, + and a sequence of public key bytes. + """ + # Get m + m = script[0] - 80 + if m < 1 or m > 15: + return None + + # Get pubkeys + pubkeys = [] + offset = 1 + while True: + pubkey_len = script[offset] + if pubkey_len != 33: + break + offset += 1 + pubkeys.append(script[offset:offset + 33]) + offset += 33 + + # Check things at the end + n = script[offset] - 80 + if n != len(pubkeys): + return None + offset += 1 + op_cms = script[offset] + if op_cms != 174: + return None + + return (m, pubkeys) + + +HARDENED_FLAG = 1 << 31 + + +def H_(x: int) -> int: + """ + Shortcut function that "hardens" a number in a BIP44 path. + """ + return x | HARDENED_FLAG + + +def is_hardened(i: int) -> bool: + """ + Returns whether an index is hardened + """ + return i & HARDENED_FLAG != 0 + + +def get_bip44_purpose(addrtype: 'AddressType') -> int: + """ + Determine the BIP 44 purpose based on the given :class:`~hwilib.common.AddressType`. + + :param addrtype: The address type + """ + if addrtype == AddressType.LEGACY: + return 44 + elif addrtype == AddressType.SH_WIT: + return 49 + elif addrtype == AddressType.WIT: + return 84 + elif addrtype == AddressType.TAP: + return 86 + else: + raise ValueError("Unknown address type") + + +def get_bip44_chain(chain: 'Chain') -> int: + """ + Determine the BIP 44 coin type based on the Bitcoin chain type. + + For the Bitcoin mainnet chain, this returns 0. For the other chains, this returns 1. + + :param chain: The chain + """ + if chain == Chain.MAIN: + return 0 + else: + return 1 + + +def get_addrtype_from_bip44_purpose(index: int) -> Optional['AddressType']: + purpose = index & ~HARDENED_FLAG + + if purpose == 44: + return AddressType.LEGACY + elif purpose == 49: + return AddressType.SH_WIT + elif purpose == 84: + return AddressType.WIT + elif purpose == 86: + return AddressType.TAP + else: + return None + + +def is_standard_path( + path: Sequence[int], + addrtype: 'AddressType', + chain: 'Chain', +) -> bool: + if len(path) != 5: + return False + if not is_hardened(path[0]) or not is_hardened(path[1]) or not is_hardened(path[2]): + return False + if is_hardened(path[3]) or is_hardened(path[4]): + return False + computed_addrtype = get_addrtype_from_bip44_purpose(path[0]) + if computed_addrtype is None: + return False + if computed_addrtype != addrtype: + return False + if path[1] != H_(get_bip44_chain(chain)): + return False + if path[3] not in [0, 1]: + return False + return True + + +def get_chain() -> 'Chain': + if constants.net.NET_NAME == "mainnet": + return Chain.MAIN + elif constants.net.NET_NAME == "testnet": + return Chain.TEST + elif constants.net.NET_NAME == "signet": + return Chain.SIGNET + elif constants.net.NET_NAME == "regtest": + return Chain.REGTEST + else: + raise ValueError("Unsupported network") + + +# Metaclass, concretely instantiated in Ledger_Client_Legacy and Ledger_Client_New +class Ledger_Client(HardwareClientBase, ABC): + is_legacy: bool + + def __new__(cls, hidDevice, *args, **kwargs): + transport = ledger_bitcoin.TransportClient('hid', hid=hidDevice) + cl = ledger_bitcoin.createClient(transport, chain=get_chain()) + + if isinstance(cl, ledger_bitcoin.client.NewClient): + return super().__new__(Ledger_Client_New) + else: + return super().__new__(Ledger_Client_Legacy) + def __init__(self, hidDevice, *, product_key: Tuple[int, int], plugin: HW_PluginBase): HardwareClientBase.__init__(self, plugin=plugin) - self.dongleObject = btchip(hidDevice) + + self.busy: bool = False + + def get_master_fingerprint(self) -> bytes: + return self.request_root_fingerprint_from_device() + + @abstractmethod + def show_address(self, address_path: str, txin_type: str): + pass + + @abstractmethod + def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str): + pass + + @abstractmethod + def sign_message( + self, + address_path: str, + message: str, + password, + *, + script_type: Optional[str] = None, + ) -> bytes: + pass + + +class Ledger_Client_Legacy(Ledger_Client): + """Client based on the bitchip library, targeting versions 2.0.* and below.""" + is_legacy = True + + def __init__(self, hidDevice, *, product_key: Tuple[int, int], + plugin: HW_PluginBase): + Ledger_Client.__init__(self, hidDevice, product_key=product_key, plugin=plugin) + + # Hack, we close the old object and instantiate a new one + hidDevice.close() + dev = hid.device() + dev.open_path(hidDevice.path) + dev.set_nonblocking(True) + self.dongleObject = btchip(HIDDongleHIDAPI(dev, True, False)) + + self.signing = False + self.preflightDone = False self._product_key = product_key self._soft_device_id = None @@ -77,6 +359,24 @@ class Ledger_Client(HardwareClientBase): def is_pairable(self): return True + def set_and_unset_signing(func): + """Function decorator to set and unset self.signing.""" + def wrapper(self, *args, **kwargs): + try: + self.signing = True + return func(self, *args, **kwargs) + finally: + self.signing = False + return wrapper + + def give_error(self, message): + _logger.info(message) + if not self.signing: + self.handler.show_error(message) + else: + self.signing = False + raise UserFacingException(message) + @runs_in_hwd_thread def close(self): self.dongleObject.dongle.close() @@ -115,8 +415,8 @@ class Ledger_Client(HardwareClientBase): # S-L-O-W - we don't handle the fingerprint directly, so compute # it manually from the previous node # This only happens once so it's bearable - #self.get_client() # prompt for the PIN before displaying the dialog if necessary - #self.handler.show_message("Computing master public key") + # self.get_client() # prompt for the PIN before displaying the dialog if necessary + # self.handler.show_message("Computing master public key") if xtype in ['p2wpkh', 'p2wsh'] and not self.supports_native_segwit(): raise UserFacingException(MSG_NEEDS_FW_UPDATE_SEGWIT) if xtype in ['p2wpkh-p2sh', 'p2wsh-p2sh'] and not self.supports_segwit(): @@ -237,111 +537,41 @@ class Ledger_Client(HardwareClientBase): return False, None, None return True, response, response - -class Ledger_KeyStore(Hardware_KeyStore): - hw_type = 'ledger' - device = 'Ledger' - - plugin: 'LedgerPlugin' - - def __init__(self, d): - Hardware_KeyStore.__init__(self, d) - self.signing = False - self.cfg = d.get('cfg', {'mode': 0}) - - def dump(self): - obj = Hardware_KeyStore.dump(self) - obj['cfg'] = self.cfg - return obj - - def get_client_dongle_object(self, *, client: Optional['Ledger_Client'] = None) -> 'btchip': - if client is None: - client = self.get_client() - return client.dongleObject - - def give_error(self, message): - _logger.info(message) - if not self.signing: - self.handler.show_error(message) - else: - self.signing = False - raise UserFacingException(message) - - def set_and_unset_signing(func): - """Function decorator to set and unset self.signing.""" - def wrapper(self, *args, **kwargs): - try: - self.signing = True - return func(self, *args, **kwargs) - finally: - self.signing = False - return wrapper - - def decrypt_message(self, pubkey, message, password): - raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device)) - @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing - def sign_message(self, sequence, message, password, *, script_type=None): - message = message.encode('utf8') - message_hash = hashlib.sha256(message).hexdigest().upper() - # prompt for the PIN before displaying the dialog if necessary - client_electrum = self.get_client() - client_ledger = self.get_client_dongle_object(client=client_electrum) - address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence - self.handler.show_message("Signing message ...\r\nMessage hash: "+message_hash) + def show_address(self, address_path: str, txin_type: str): + self.handler.show_message(_("Showing address ...")) + segwit = is_segwit_script_type(txin_type) + segwitNative = txin_type == 'p2wpkh' try: - info = client_ledger.signMessagePrepare(address_path, message) - pin = "" - if info['confirmationNeeded']: - # do the authenticate dialog and get pin: - pin = self.handler.get_auth(info, client=client_electrum) - if not pin: - raise UserWarning(_('Cancelled by user')) - pin = str(pin).encode() - signature = client_ledger.signMessageSign(pin) + self.dongleObject.getWalletPublicKey(address_path, showOnScreen=True, segwit=segwit, segwitNative=segwitNative) except BTChipException as e: - if e.sw == 0x6a80: - self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. " - "Only alphanumerical messages shorter than 140 characters are supported. " - "Please remove any extra characters (tab, carriage return) and retry.") - elif e.sw == 0x6985: # cancelled by user - return b'' + if e.sw == 0x6985: # cancelled by user + pass elif e.sw == 0x6982: raise # pin lock. decorator will catch it + elif e.sw == 0x6b00: # hw.1 raises this + self.handler.show_error('{}\n{}\n{}'.format( + _('Error showing address') + ':', + e, + _('Your device might not have support for this functionality.'))) else: - self.give_error(e) - except UserWarning: - self.handler.show_error(_('Cancelled by user')) - return b'' - except Exception as e: - self.give_error(e) + _logger.exception('') + self.handler.show_error(e) + except BaseException as e: + _logger.exception('') + self.handler.show_error(e) finally: self.handler.finished() - # Parse the ASN.1 signature - rLength = signature[3] - r = signature[4 : 4 + rLength] - sLength = signature[4 + rLength + 1] - s = signature[4 + rLength + 2:] - if rLength == 33: - r = r[1:] - if sLength == 33: - s = s[1:] - # And convert it - - # Pad r and s points with 0x00 bytes when the point is small to get valid signature. - r_padded = bytes([0x00]) * (32 - len(r)) + r - s_padded = bytes([0x00]) * (32 - len(s)) + s - - return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing - def sign_transaction(self, tx, password): + def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str): if tx.is_complete(): return + inputs = [] inputsPaths = [] chipInputs = [] @@ -352,9 +582,6 @@ class Ledger_KeyStore(Hardware_KeyStore): segwitTransaction = False pin = "" # prompt for the PIN before displaying the dialog if necessary - client_electrum = self.get_client() - assert client_electrum - client_ledger = self.get_client_dongle_object(client=client_electrum) # Fetch inputs of the transaction to sign for txin in tx.inputs(): @@ -365,16 +592,16 @@ class Ledger_KeyStore(Hardware_KeyStore): p2shTransaction = True if txin.script_type in ['p2wpkh-p2sh', 'p2wsh-p2sh']: - if not client_electrum.supports_segwit(): + if not self.supports_segwit(): self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT) segwitTransaction = True if txin.script_type in ['p2wpkh', 'p2wsh']: - if not client_electrum.supports_native_segwit(): + if not self.supports_native_segwit(): self.give_error(MSG_NEEDS_FW_UPDATE_SEGWIT) segwitTransaction = True - my_pubkey, full_path = self.find_my_pubkey_in_txinout(txin) + my_pubkey, full_path = keystore.find_my_pubkey_in_txinout(txin) if not full_path: self.give_error("No matching pubkey for sign_transaction") # should never happen full_path = convert_bip32_intpath_to_strpath(full_path)[2:] @@ -397,24 +624,24 @@ class Ledger_KeyStore(Hardware_KeyStore): if p2shTransaction: for txin in tx.inputs(): if txin.script_type != 'p2sh': - self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen + self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen txOutput = var_int(len(tx.outputs())) for o in tx.outputs(): txOutput += int_to_hex(o.value, 8) script = o.scriptpubkey.hex() - txOutput += var_int(len(script)//2) + txOutput += var_int(len(script) // 2) txOutput += script txOutput = bfh(txOutput) - if not client_electrum.supports_multi_output(): + if not self.supports_multi_output(): if len(tx.outputs()) > 2: self.give_error("Transaction with more than 2 outputs not supported") for txout in tx.outputs(): - if client_electrum.is_hw1() and txout.address and not is_b58_address(txout.address): + if self.is_hw1() and txout.address and not is_b58_address(txout.address): self.give_error(_("This {} device can only send to base58 addresses.").format(self.device)) if not txout.address: - if client_electrum.is_hw1(): + if self.is_hw1(): self.give_error(_("Only address outputs are supported by {}").format(self.device)) # note: max_size based on https://github.com/LedgerHQ/ledger-app-btc/commit/3a78dee9c0484821df58975803e40d58fbfc2c38#diff-c61ccd96a6d8b54d48f54a3bc4dfa7e2R26 validate_op_return_output(txout, max_size=190) @@ -431,7 +658,7 @@ class Ledger_KeyStore(Hardware_KeyStore): # prioritise hiding outputs on the 'change' branch from user # because no more than one change address allowed if txout.is_change == any_output_on_change_branch: - my_pubkey, changePath = self.find_my_pubkey_in_txinout(txout) + my_pubkey, changePath = keystore.find_my_pubkey_in_txinout(txout) assert changePath changePath = convert_bip32_intpath_to_strpath(changePath)[2:] has_change = True @@ -443,18 +670,17 @@ class Ledger_KeyStore(Hardware_KeyStore): try: # Get trusted inputs from the original transactions for input_idx, utxo in enumerate(inputs): - self.handler.show_message(_("Preparing transaction inputs...") - + f" (phase1, {input_idx}/{len(inputs)})") + self.handler.show_message(_("Preparing transaction inputs...") + f" (phase1, {input_idx}/{len(inputs)})") sequence = int_to_hex(utxo[5], 4) - if segwitTransaction and not client_electrum.supports_segwit_trustedInputs(): + if segwitTransaction and not self.supports_segwit_trustedInputs(): tmp = bfh(utxo[3])[::-1] tmp += bfh(int_to_hex(utxo[1], 4)) tmp += bfh(int_to_hex(utxo[6], 8)) # txin['value'] - chipInputs.append({'value' : tmp, 'witness' : True, 'sequence' : sequence}) + chipInputs.append({'value': tmp, 'witness': True, 'sequence': sequence}) redeemScripts.append(bfh(utxo[2])) - elif (not p2shTransaction) or client_electrum.supports_multi_output(): + elif (not p2shTransaction) or self.supports_multi_output(): txtmp = bitcoinTransaction(bfh(utxo[0])) - trustedInput = client_ledger.getTrustedInput(txtmp, utxo[1]) + trustedInput = self.dongleObject.getTrustedInput(txtmp, utxo[1]) trustedInput['sequence'] = sequence if segwitTransaction: trustedInput['witness'] = True @@ -466,7 +692,7 @@ class Ledger_KeyStore(Hardware_KeyStore): else: tmp = bfh(utxo[3])[::-1] tmp += bfh(int_to_hex(utxo[1], 4)) - chipInputs.append({'value' : tmp, 'sequence' : sequence}) + chipInputs.append({'value': tmp, 'sequence': sequence}) redeemScripts.append(bfh(utxo[2])) self.handler.show_message(_("Confirm Transaction on your Ledger device...")) @@ -474,30 +700,28 @@ class Ledger_KeyStore(Hardware_KeyStore): firstTransaction = True inputIndex = 0 rawTx = tx.serialize_to_network() - client_ledger.enableAlternate2fa(False) + self.dongleObject.enableAlternate2fa(False) if segwitTransaction: - client_ledger.startUntrustedTransaction(True, inputIndex, - chipInputs, redeemScripts[inputIndex], version=tx.version) + self.dongleObject.startUntrustedTransaction(True, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version) # we don't set meaningful outputAddress, amount and fees # as we only care about the alternateEncoding==True branch - outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) + outputData = self.dongleObject.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) outputData['outputData'] = txOutput if outputData['confirmationNeeded']: outputData['address'] = output self.handler.finished() # do the authenticate dialog and get pin: - pin = self.handler.get_auth(outputData, client=client_electrum) + pin = self.handler.get_auth(outputData, client=self) if not pin: raise UserWarning() self.handler.show_message(_("Confirmed. Signing Transaction...")) while inputIndex < len(inputs): - self.handler.show_message(_("Signing transaction...") - + f" (phase2, {inputIndex}/{len(inputs)})") + self.handler.show_message(_("Signing transaction...") + f" (phase2, {inputIndex}/{len(inputs)})") singleInput = [chipInputs[inputIndex]] - client_ledger.startUntrustedTransaction(False, 0, - singleInput, redeemScripts[inputIndex], version=tx.version) - inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) - inputSignature[0] = 0x30 # force for 1.4.9+ + self.dongleObject.startUntrustedTransaction(False, 0, + singleInput, redeemScripts[inputIndex], version=tx.version) + inputSignature = self.dongleObject.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) + inputSignature[0] = 0x30 # force for 1.4.9+ my_pubkey = inputs[inputIndex][4] tx.add_signature_to_txin(txin_idx=inputIndex, signing_pubkey=my_pubkey.hex(), @@ -505,26 +729,24 @@ class Ledger_KeyStore(Hardware_KeyStore): inputIndex = inputIndex + 1 else: while inputIndex < len(inputs): - self.handler.show_message(_("Signing transaction...") - + f" (phase2, {inputIndex}/{len(inputs)})") - client_ledger.startUntrustedTransaction(firstTransaction, inputIndex, - chipInputs, redeemScripts[inputIndex], version=tx.version) + self.handler.show_message(_("Signing transaction...") + f" (phase2, {inputIndex}/{len(inputs)})") + self.dongleObject.startUntrustedTransaction(firstTransaction, inputIndex, chipInputs, redeemScripts[inputIndex], version=tx.version) # we don't set meaningful outputAddress, amount and fees # as we only care about the alternateEncoding==True branch - outputData = client_ledger.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) + outputData = self.dongleObject.finalizeInput(b'', 0, 0, changePath, bfh(rawTx)) outputData['outputData'] = txOutput if outputData['confirmationNeeded']: outputData['address'] = output self.handler.finished() # do the authenticate dialog and get pin: - pin = self.handler.get_auth(outputData, client=client_electrum) + pin = self.handler.get_auth(outputData, client=self) if not pin: raise UserWarning() self.handler.show_message(_("Confirmed. Signing Transaction...")) else: # Sign input with the provided PIN - inputSignature = client_ledger.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) - inputSignature[0] = 0x30 # force for 1.4.9+ + inputSignature = self.dongleObject.untrustedHashSign(inputsPaths[inputIndex], pin, lockTime=tx.locktime) + inputSignature[0] = 0x30 # force for 1.4.9+ my_pubkey = inputs[inputIndex][4] tx.add_signature_to_txin(txin_idx=inputIndex, signing_pubkey=my_pubkey.hex(), @@ -540,10 +762,10 @@ class Ledger_KeyStore(Hardware_KeyStore): elif e.sw == 0x6982: raise # pin lock. decorator will catch it else: - self.logger.exception('') + _logger.exception('') self.give_error(e) except BaseException as e: - self.logger.exception('') + _logger.exception('') self.give_error(e) finally: self.handler.finished() @@ -551,57 +773,529 @@ class Ledger_KeyStore(Hardware_KeyStore): @runs_in_hwd_thread @test_pin_unlocked @set_and_unset_signing - def show_address(self, sequence, txin_type): - client_ledger = self.get_client_dongle_object() - address_path = self.get_derivation_prefix()[2:] + "/%d/%d"%sequence - self.handler.show_message(_("Showing address ...")) - segwit = is_segwit_script_type(txin_type) - segwitNative = txin_type == 'p2wpkh' + def sign_message( + self, + address_path: str, + sequence: 'AddressIndexGeneric', + message: str, + password, + *, + script_type: Optional[str] = None, + ) -> bytes: + message = message.encode('utf8') + message_hash = hashlib.sha256(message).hexdigest().upper() + + self.handler.show_message("Signing message ...\r\nMessage hash: " + message_hash) try: - client_ledger.getWalletPublicKey(address_path, showOnScreen=True, segwit=segwit, segwitNative=segwitNative) + info = self.dongleObject.signMessagePrepare(address_path, message) + pin = "" + if info['confirmationNeeded']: + # do the authenticate dialog and get pin: + pin = self.handler.get_auth(info, client=self) + if not pin: + raise UserWarning(_('Cancelled by user')) + pin = str(pin).encode() + signature = self.dongleObject.signMessageSign(pin) except BTChipException as e: - if e.sw == 0x6985: # cancelled by user - pass + if e.sw == 0x6a80: + self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. " + "Only alphanumerical messages shorter than 140 characters are supported. " + "Please remove any extra characters (tab, carriage return) and retry.") + elif e.sw == 0x6985: # cancelled by user + return b'' elif e.sw == 0x6982: raise # pin lock. decorator will catch it - elif e.sw == 0x6b00: # hw.1 raises this - self.handler.show_error('{}\n{}\n{}'.format( - _('Error showing address') + ':', - e, - _('Your device might not have support for this functionality.'))) else: - self.logger.exception('') - self.handler.show_error(e) + self.give_error(e) + except UserWarning: + self.handler.show_error(_('Cancelled by user')) + return b'' + except Exception as e: + self.give_error(e) + finally: + self.handler.finished() + # Parse the ASN.1 signature + rLength = signature[3] + r = signature[4: 4 + rLength] + sLength = signature[4 + rLength + 1] + s = signature[4 + rLength + 2:] + if rLength == 33: + r = r[1:] + if sLength == 33: + s = s[1:] + # And convert it + + # Pad r and s points with 0x00 bytes when the point is small to get valid signature. + r_padded = bytes([0x00]) * (32 - len(r)) + r + s_padded = bytes([0x00]) * (32 - len(s)) + s + + return bytes([27 + 4 + (signature[0] & 0x01)]) + r_padded + s_padded + + +class Ledger_Client_New(Ledger_Client): + """Client based on the ledger_bitcoin library, targeting versions 2.1.* and above.""" + + is_legacy = False + + def __init__(self, hidDevice, *, product_key: Tuple[int, int], + plugin: HW_PluginBase): + Ledger_Client.__init__(self, hidDevice, product_key=product_key, plugin=plugin) + + transport = ledger_bitcoin.TransportClient('hid', hid=hidDevice) + self.client = ledger_bitcoin.client.NewClient(transport, get_chain()) + + self._product_key = product_key + self._soft_device_id = None + + self.master_fingerprint = None + + self._known_xpubs: Dict[str, str] = {} # path ==> xpub + self._registered_policies: Dict[bytes, bytes] = {} # wallet id => wallet hmac + + def is_pairable(self): + return True + + @runs_in_hwd_thread + def close(self): + self.client.stop() + + def is_initialized(self): + return True + + @runs_in_hwd_thread + def get_soft_device_id(self): + if self._soft_device_id is None: + self._soft_device_id = self.request_root_fingerprint_from_device() + return self._soft_device_id + + def device_model_name(self): + return LedgerPlugin.device_name_from_product_key(self._product_key) + + @runs_in_hwd_thread + def has_usable_connection_with_device(self): + try: + self.client.get_version() + except BaseException: + return False + return True + + @runs_in_hwd_thread + @test_pin_unlocked + def get_xpub(self, bip32_path, xtype): + # try silently first; if not a standard path, repeat with on-screen display + + bip32_path = bip32_path.replace('h', '\'') + + # cache known path/xpubs combinations in order to avoid requesting them many times + if bip32_path in self._known_xpubs: + xpub = self._known_xpubs[bip32_path] + else: + try: + xpub = self.client.get_extended_pubkey(bip32_path) + except NotSupportedError: + xpub = self.client.get_extended_pubkey(bip32_path, True) + self._known_xpubs[bip32_path] = xpub + + # Ledger always returns 'standard' xpubs; convert to the right xtype + return convert_xpub(xpub, xtype) + + @runs_in_hwd_thread + def request_root_fingerprint_from_device(self) -> str: + return self.client.get_master_fingerprint().hex() + + @runs_in_hwd_thread + @test_pin_unlocked + def get_master_fingerprint(self) -> bytes: + if self.master_fingerprint is None: + self.master_fingerprint = self.client.get_master_fingerprint() + return self.master_fingerprint + + @runs_in_hwd_thread + @test_pin_unlocked + def get_singlesig_default_wallet_policy(self, addr_type: 'AddressType', account: int) -> 'WalletPolicy': + assert account >= HARDENED_FLAG + + if addr_type == AddressType.LEGACY: + template = "pkh(@0/**)" + elif addr_type == AddressType.WIT: + template = "wpkh(@0/**)" + elif addr_type == AddressType.SH_WIT: + template = "sh(wpkh(@0/**))" + elif addr_type == AddressType.TAP: + template = "tr(@0/**)" + else: + raise ValueError("Unknown address type") + + fpr = self.get_master_fingerprint() + key_origin_steps = f"{get_bip44_purpose(addr_type)}'/{get_bip44_chain(self.client.chain)}'/{account & ~HARDENED_FLAG}'" + xpub = self.get_xpub(f"m/{key_origin_steps}", 'standard') + key_str = f"[{fpr.hex()}/{key_origin_steps}]{xpub}" + + # Make the Wallet object + return WalletPolicy(name="", descriptor_template=template, keys_info=[key_str]) + + @runs_in_hwd_thread + @test_pin_unlocked + def get_singlesig_policy_for_path(self, path: str, xtype: str, master_fp: bytes) -> Optional['WalletPolicy']: + path = path.replace("h", "'") + path_parts = path.split("/") + + if not 5 <= len(path_parts) <= 6: + raise UserFacingException(f"Unsupported path: {path}") + + path_root = "/".join(path_parts[:-2]) + + fpr = self.get_master_fingerprint() + + # Ledger always uses standard xpubs in wallet policies + xpub = self.get_xpub(f"m/{path_root}", 'standard') + + key_info = f"[{fpr.hex()}/{path_root}]{xpub}" + + if xtype == 'p2pkh': + name = "Legacy P2PKH" + descriptor_template = "pkh(@0/**)" + elif xtype == 'p2wpkh-p2sh': + name = "Nested SegWit" + descriptor_template = "sh(wpkh(@0/**))" + elif xtype == 'p2wpkh': + name = "SegWit" + descriptor_template = "wpkh(@0/**)" + elif xtype == 'p2tr': + name = "Taproot" + descriptor_template = "tr(@0/**)" + else: + return None + + policy = WalletPolicy("", descriptor_template, [key_info]) + if is_policy_standard(policy, master_fp, constants.net.BIP44_COIN_TYPE): + return policy + + # Non standard policy, so give it a name + return WalletPolicy(name, descriptor_template, [key_info]) + + def password_dialog(self, msg=None): + response = self.handler.get_word(msg) + if response is None: + return False, None, None + return True, response, response + + def _register_policy_if_needed(self, wallet_policy: 'WalletPolicy') -> Tuple[bytes, bytes]: + # If the policy is not register, registers it and saves its hmac on success + # Returns the pair of wallet id and wallet hmac + if wallet_policy.id not in self._registered_policies: + wallet_id, wallet_hmac = self.client.register_wallet(wallet_policy) + assert wallet_id == wallet_policy.id + self._registered_policies[wallet_id] = wallet_hmac + return wallet_policy.id, self._registered_policies[wallet_policy.id] + + @runs_in_hwd_thread + @test_pin_unlocked + def show_address(self, address_path: str, txin_type: str): + client_ledger = self.client + self.handler.show_message(_("Showing address ...")) + + # TODO: generalize for multisignature + + try: + master_fp = client_ledger.get_master_fingerprint() + wallet_policy = self.get_singlesig_policy_for_path(address_path, txin_type, master_fp) + + change, addr_index = [int(i) for i in address_path.split("/")[-2:]] + + wallet_hmac = None + if not is_policy_standard(wallet_policy, master_fp, constants.net.BIP44_COIN_TYPE): + wallet_id, wallet_hmac = self._register_policy_if_needed(wallet_policy) + + self.client.get_wallet_address(wallet_policy, wallet_hmac, change, addr_index, True) + except DenyError: + pass # cancelled by user + except BaseException as e: + _logger.exception('Error while showing an address') + self.handler.show_error(e) + finally: + self.handler.finished() + + @runs_in_hwd_thread + @test_pin_unlocked + def sign_transaction(self, keystore: Hardware_KeyStore, tx: PartialTransaction, password: str): + if tx.is_complete(): + return + + # mostly adapted from HWI + + psbt_bytes = tx.serialize_as_bytes() + psbt = ledger_bitcoin.client.PSBT() + psbt.deserialize(base64.b64encode(psbt_bytes).decode('ascii')) + + try: + + master_fp = self.client.get_master_fingerprint() + + # Figure out which wallets are signing + wallets: Dict[bytes, Tuple[AddressType, WalletPolicy, Optional[bytes]]] = {} + for input_num, (electrum_txin, psbt_in) in enumerate(zip(tx.inputs(), psbt.inputs)): + if electrum_txin.is_coinbase_input(): + raise UserFacingException("Coinbase not supported") # should never happen + + utxo = None + scriptcode = b"" + if psbt_in.witness_utxo: + utxo = psbt_in.witness_utxo + if psbt_in.non_witness_utxo: + if psbt_in.prev_txid != psbt_in.non_witness_utxo.hash: + raise UserFacingException(f"Input {input_num} has a non_witness_utxo with the wrong hash") + assert psbt_in.prev_out is not None + utxo = psbt_in.non_witness_utxo.vout[psbt_in.prev_out] + + if utxo is None: + continue + scriptcode = utxo.scriptPubKey + + p2sh = False + if electrum_txin.script_type in ['p2sh', 'p2wpkh-p2sh']: + if len(psbt_in.redeem_script) == 0: + continue + scriptcode = psbt_in.redeem_script + p2sh = True + + is_wit, wit_ver, __ = is_witness(scriptcode) + + script_addrtype = AddressType.LEGACY + if is_wit: + # if it's a segwit spend (any version), make sure the witness_utxo is also present + psbt_in.witness_utxo = utxo + + if p2sh: + if wit_ver == 0: + script_addrtype = AddressType.SH_WIT + else: + raise UserFacingException("Cannot have witness v1+ in p2sh") + else: + if wit_ver == 0: + script_addrtype = AddressType.WIT + elif wit_ver == 1: + script_addrtype = AddressType.TAP + else: + continue + + # Check if P2WSH + if electrum_txin.script_type in ['p2wsh']: + if len(psbt_in.witness_script) == 0: + continue + scriptcode = psbt_in.witness_script + + multisig = parse_multisig(scriptcode) + if multisig is not None: + k, ms_pubkeys = multisig + + # Figure out the parent xpubs + key_exprs: List[str] = [] + ok = True + our_keys = 0 + for pub in ms_pubkeys: + if pub in psbt_in.hd_keypaths: + pk_origin = psbt_in.hd_keypaths[pub] + if pk_origin.fingerprint == master_fp: + our_keys += 1 + + for xpub_bytes, xpub_origin in psbt.xpub.items(): + xpub_str = EncodeBase58Check(xpub_bytes) + if (xpub_origin.fingerprint == pk_origin.fingerprint) and (xpub_origin.path == pk_origin.path[:len(xpub_origin.path)]): + key_origin_full = pk_origin.to_string().replace('h', '\'') + # strip last two steps of derivation + key_origin_parts = key_origin_full.split('/') + if len(key_origin_parts) < 3: + raise UserFacingException(_('Unable to sign this transaction')) + key_origin = '/'.join(key_origin_parts[:-2]) + + key_exprs.append(f"[{key_origin}]{xpub_str}") + break + + else: + # No xpub, Ledger will not accept this multisig + ok = False + + if not ok: + continue + + # Electrum uses sortedmulti; we make sure that the array of key information is normalized in a consistent order + key_exprs = list(sorted(key_exprs)) + + # Make and register the MultisigWallet + msw = MultisigWallet(f"{k} of {len(key_exprs)} Multisig", script_addrtype, k, key_exprs) + msw_id = msw.id + if msw_id not in wallets: + __, registered_hmac = self._register_policy_if_needed(msw) + wallets[msw_id] = ( + script_addrtype, + msw, + registered_hmac, + ) + else: + def process_origin(origin: KeyOriginInfo) -> None: + if is_standard_path(origin.path, script_addrtype, get_chain()): + # these policies do not need to be registered + policy = self.get_singlesig_default_wallet_policy(script_addrtype, origin.path[2]) + wallets[policy.id] = ( + script_addrtype, + self.get_singlesig_default_wallet_policy(script_addrtype, origin.path[2]), + None, # Wallet hmac + ) + else: + # register the policy + if script_addrtype == AddressType.LEGACY: + name = "Legacy" + template = "pkh(@0/**)" + elif script_addrtype == AddressType.WIT: + name = "Native SegWit" + template = "wpkh(@0/**)" + elif script_addrtype == AddressType.SH_WIT: + name = "Nested SegWit" + template = "sh(wpkh(@0/**))" + elif script_addrtype == AddressType.TAP: + name = "Taproot" + template = "tr(@0/**)" + else: + raise ValueError("Unknown address type") + + key_origin_info = origin.to_string() + key_origin_steps = key_origin_info.replace('h', '\'').split('/')[1:] + if len(key_origin_steps) < 3: + # Skip this input, not able to sign + return + + # remove the last two steps + account_key_origin = "/".join(key_origin_steps[:-2]) + + # get the account-level xpub + xpub = self.get_xpub(f"m/{account_key_origin}", 'standard') + key_str = f"[{master_fp.hex()}/{account_key_origin}]{xpub}" + + policy = WalletPolicy(name, template, [key_str]) + __, registered_hmac = self.client.register_wallet(policy) + wallets[policy.id] = ( + script_addrtype, + policy, + registered_hmac, + ) + for key, origin in psbt_in.hd_keypaths.items(): + if origin.fingerprint == master_fp: + process_origin(origin) + + for key, (__, origin) in psbt_in.tap_bip32_paths.items(): + # TODO: Support script path signing + if key == psbt_in.tap_internal_key and origin.fingerprint == master_fp: + process_origin(origin) + + self.handler.show_message(_("Confirm Transaction on your Ledger device...")) + + if len(wallets) == 0: + # Could not find a WalletPolicy to sign with + raise UserFacingException(_('Unable to sign this transaction')) + + # For each wallet, sign + for __, (__, wallet, wallet_hmac) in wallets.items(): + input_sigs = self.client.sign_psbt(psbt, wallet, wallet_hmac) + for idx, pubkey, sig in input_sigs: + tx.add_signature_to_txin(txin_idx=idx, signing_pubkey=pubkey.hex(), sig=sig.hex()) + except DenyError: + pass # cancelled by user + except BaseException as e: + _logger.exception('Error while signing') + self.handler.show_error(e) + finally: + self.handler.finished() + + @runs_in_hwd_thread + @test_pin_unlocked + def sign_message( + self, + address_path: str, + message: str, + password, + *, + script_type: Optional[str] = None, + ) -> bytes: + message = message.encode('utf8') + message_hash = hashlib.sha256(message).hexdigest().upper() + # prompt for the PIN before displaying the dialog if necessary + self.handler.show_message("Signing message ...\r\nMessage hash: " + message_hash) + + result = b'' + try: + result = base64.b64decode(self.client.sign_message(message, address_path)) + except DenyError: + pass # cancelled by user except BaseException as e: - self.logger.exception('') + _logger.exception('') self.handler.show_error(e) finally: self.handler.finished() + return result + + +class Ledger_KeyStore(Hardware_KeyStore): + """Ledger keystore. Targets all versions, will have different behavior with different clients.""" + + hw_type = 'ledger' + device = 'Ledger' + + plugin: 'LedgerPlugin' + + def __init__(self, d): + Hardware_KeyStore.__init__(self, d) + self.cfg = d.get('cfg', {'mode': 0}) + + def dump(self): + obj = Hardware_KeyStore.dump(self) + obj['cfg'] = self.cfg + return obj + + def get_client_dongle_object(self, *, client: Optional[Ledger_Client] = None) -> Ledger_Client: + if client is None: + client = self.get_client() + return client + + def decrypt_message(self, pubkey, message, password): + raise UserFacingException(_('Encryption and decryption are currently not supported for {}').format(self.device)) + + def sign_message(self, sequence, *args, **kwargs): + address_path = self.get_derivation_prefix()[2:] + "/%d/%d" % sequence + return self.get_client_dongle_object().sign_message(address_path, *args, **kwargs) + + def sign_transaction(self, *args, **kwargs): + return self.get_client_dongle_object().sign_transaction(self, *args, **kwargs) + + def show_address(self, sequence, *args, **kwargs): + address_path = self.get_derivation_prefix()[2:] + "/%d/%d" % sequence + return self.get_client_dongle_object().show_address(address_path, *args, **kwargs) + + class LedgerPlugin(HW_PluginBase): keystore_class = Ledger_KeyStore - minimum_library = (0, 1, 32) - DEVICE_IDS = [ - (0x2581, 0x1807), # HW.1 legacy btchip - (0x2581, 0x2b7c), # HW.1 transitional production - (0x2581, 0x3b7c), # HW.1 ledger production - (0x2581, 0x4b7c), # HW.1 ledger test - (0x2c97, 0x0000), # Blue - (0x2c97, 0x0001), # Nano-S - (0x2c97, 0x0004), # Nano-X - (0x2c97, 0x0005), # Nano-S Plus - (0x2c97, 0x0006), # RFU - (0x2c97, 0x0007), # RFU - (0x2c97, 0x0008), # RFU - (0x2c97, 0x0009), # RFU - (0x2c97, 0x000a) # RFU - ] + minimum_library = (0, 1, 1) + DEVICE_IDS = [(0x2581, 0x1807), # HW.1 legacy btchip + (0x2581, 0x2b7c), # HW.1 transitional production + (0x2581, 0x3b7c), # HW.1 ledger production + (0x2581, 0x4b7c), # HW.1 ledger test + (0x2c97, 0x0000), # Blue + (0x2c97, 0x0001), # Nano-S + (0x2c97, 0x0004), # Nano-X + (0x2c97, 0x0005), # Nano-S Plus + (0x2c97, 0x0006), # RFU + (0x2c97, 0x0007), # RFU + (0x2c97, 0x0008), # RFU + (0x2c97, 0x0009), # RFU + (0x2c97, 0x000a)] # RFU VENDOR_IDS = (0x2c97,) LEDGER_MODEL_IDS = { 0x10: "Ledger Nano S", 0x40: "Ledger Nano X", 0x50: "Ledger Nano S Plus", } + SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') def __init__(self, parent, config, name): @@ -609,6 +1303,7 @@ class LedgerPlugin(HW_PluginBase): HW_PluginBase.__init__(self, parent, config, name) self.libraries_available = self.check_libraries_available() if not self.libraries_available: + print("Library unavailable") return # to support legacy devices and legacy firmwares self.device_manager().register_devices(self.DEVICE_IDS, plugin=self) @@ -616,17 +1311,8 @@ class LedgerPlugin(HW_PluginBase): self.device_manager().register_vendor_ids(self.VENDOR_IDS, plugin=self) def get_library_version(self): - try: - import btchip - version = btchip.__version__ - except ImportError: - raise - except: - version = "unknown" - if BTCHIP: - return version - else: - raise LibraryFoundButUnusable(library_version=version) + # Older versions of the device would rather require the btchip library + return ledger_bitcoin.__version__ @classmethod def _recognize_device(cls, product_key) -> Tuple[bool, Optional[str]]: @@ -668,7 +1354,8 @@ class LedgerPlugin(HW_PluginBase): return device @runs_in_hwd_thread - def get_btchip_device(self, device): + def get_btchip_device(self, device: Device) -> Optional['HID']: + # TODO: refactor ledger = False if device.product_key[0] == 0x2581 and device.product_key[1] == 0x3b7c: ledger = True @@ -679,13 +1366,16 @@ class LedgerPlugin(HW_PluginBase): ledger = True else: return None # non-compatible interface of a Nano S or Blue - dev = hid.device() - dev.open_path(device.path) - dev.set_nonblocking(True) - return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG) + + btchip_device = HID() + btchip_device.path = device.path + btchip_device.open() + + return btchip_device @runs_in_hwd_thread - def create_client(self, device, handler): + def create_client(self, device, handler) -> Ledger_Client: + # TODO: refactor client = self.get_btchip_device(device) if client is not None: client = Ledger_Client(client, product_key=device.product_key, plugin=self) @@ -693,32 +1383,16 @@ class LedgerPlugin(HW_PluginBase): def setup_device(self, device_info, wizard, purpose): device_id = device_info.device.id_ - client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) + client: Ledger_Client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) wizard.run_task_without_blocking_gui( - task=lambda: client.get_xpub("m/0'", 'standard')) # TODO replace by direct derivation once Nano S > 1.1 + task=lambda: client.get_master_fingerprint()) return client def get_xpub(self, device_id, derivation, xtype, wizard): if xtype not in self.SUPPORTED_XTYPES: raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) client = self.scan_and_create_client_for_device(device_id=device_id, wizard=wizard) - client.checkDevice() - xpub = client.get_xpub(derivation, xtype) - return xpub - - @runs_in_hwd_thread - def get_client(self, keystore, force_pair=True, *, - devices=None, allow_user_interaction=True): - # All client interaction should not be in the main GUI thread - client = super().get_client(keystore, force_pair, - devices=devices, - allow_user_interaction=allow_user_interaction) - # returns the client for a given keystore. can use xpub - #if client: - # client.used() - if client is not None: - client.checkDevice() - return client + return client.get_xpub(derivation, xtype) @runs_in_hwd_thread def show_address(self, wallet, address, keystore=None): @@ -731,4 +1405,5 @@ class LedgerPlugin(HW_PluginBase): return sequence = wallet.get_address_index(address) txin_type = wallet.get_txin_type(address) + keystore.show_address(sequence, txin_type) diff --git a/electrum/plugins/ledger/qt.py b/electrum/plugins/ledger/qt.py index 3ce0bb8f0..750f25b44 100644 --- a/electrum/plugins/ledger/qt.py +++ b/electrum/plugins/ledger/qt.py @@ -28,7 +28,7 @@ class Plugin(LedgerPlugin, QtPluginBase): keystore = wallet.get_keystore() if type(keystore) == self.keystore_class and len(addrs) == 1: def show_address(): - keystore.thread.add(partial(self.show_address, wallet, addrs[0])) + keystore.thread.add(partial(self.show_address, wallet, addrs[0], keystore=keystore)) menu.addAction(_("Show on Ledger"), show_address) class Ledger_Handler(QtHandlerBase):