from binascii import hexlify from struct import pack, unpack import hashlib import time import sys import traceback import electrum from electrum.bitcoin import EncodeBase58Check, DecodeBase58Check, TYPE_ADDRESS, int_to_hex, var_int from electrum.i18n import _ from electrum.plugins import BasePlugin, hook from electrum.keystore import Hardware_KeyStore, parse_xpubkey from ..hw_wallet import HW_PluginBase from electrum.util import format_satoshis_plain, print_error try: import hid from btchip.btchipComm import HIDDongleHIDAPI, DongleWait from btchip.btchip import btchip from btchip.btchipUtils import compress_public_key,format_transaction, get_regular_input_script, get_p2sh_input_script from btchip.bitcoinTransaction import bitcoinTransaction from btchip.btchipPersoWizard import StartBTChipPersoDialog from btchip.btchipFirmwareWizard import checkFirmware, updateFirmware from btchip.btchipException import BTChipException BTCHIP = True BTCHIP_DEBUG = False except ImportError: BTCHIP = False class Ledger_Client(): def __init__(self, hidDevice): self.dongleObject = btchip(hidDevice) self.preflightDone = False def is_pairable(self): return True def close(self): self.dongleObject.dongle.close() def timeout(self, cutoff): pass def is_initialized(self): return True def label(self): return "" def i4b(self, x): return pack('>I', x) def get_xpub(self, bip32_path): self.checkDevice() # bip32_path is of the form 44'/0'/1' # S-L-O-W - we don't handle the fingerprint directly, so compute # it manually from the previous node # This only happens once so it's bearable #self.get_client() # prompt for the PIN before displaying the dialog if necessary #self.handler.show_message("Computing master public key") try: splitPath = bip32_path.split('/') if splitPath[0] == 'm': splitPath = splitPath[1:] bip32_path = bip32_path[2:] fingerprint = 0 if len(splitPath) > 1: prevPath = "/".join(splitPath[0:len(splitPath) - 1]) nodeData = self.dongleObject.getWalletPublicKey(prevPath) publicKey = compress_public_key(nodeData['publicKey']) h = hashlib.new('ripemd160') h.update(hashlib.sha256(publicKey).digest()) fingerprint = unpack(">I", h.digest()[0:4])[0] nodeData = self.dongleObject.getWalletPublicKey(bip32_path) publicKey = compress_public_key(nodeData['publicKey']) depth = len(splitPath) lastChild = splitPath[len(splitPath) - 1].split('\'') if len(lastChild) == 1: childnum = int(lastChild[0]) else: childnum = 0x80000000 | int(lastChild[0]) xpub = "0488B21E".decode('hex') + chr(depth) + self.i4b(fingerprint) + self.i4b(childnum) + str(nodeData['chainCode']) + str(publicKey) except Exception, e: #self.give_error(e, True) return None finally: #self.handler.clear_dialog() pass return EncodeBase58Check(xpub) def has_detached_pin_support(self, client): try: client.getVerifyPinRemainingAttempts() return True except BTChipException, e: if e.sw == 0x6d00: return False raise e def is_pin_validated(self, client): try: # Invalid SET OPERATION MODE to verify the PIN status client.dongle.exchange(bytearray([0xe0, 0x26, 0x00, 0x00, 0x01, 0xAB])) except BTChipException, e: if (e.sw == 0x6982): return False if (e.sw == 0x6A80): return True raise e def perform_hw1_preflight(self): try: firmware = self.dongleObject.getFirmwareVersion()['version'].split(".") if not checkFirmware(firmware): self.dongleObject.close() raise Exception("HW1 firmware version too old. Please update at https://www.ledgerwallet.com") try: self.dongleObject.getOperationMode() except BTChipException, e: if (e.sw == 0x6985): self.dongleObject.close() dialog = StartBTChipPersoDialog() dialog.exec_() # Acquire the new client on the next run else: raise e if self.has_detached_pin_support(self.dongleObject) and not self.is_pin_validated(self.dongleObject) and (self.handler <> None): remaining_attempts = self.dongleObject.getVerifyPinRemainingAttempts() if remaining_attempts <> 1: msg = "Enter your Ledger PIN - remaining attempts : " + str(remaining_attempts) else: msg = "Enter your Ledger PIN - WARNING : LAST ATTEMPT. If the PIN is not correct, the dongle will be wiped." confirmed, p, pin = self.password_dialog(msg) if not confirmed: raise Exception('Aborted by user - please unplug the dongle and plug it again before retrying') pin = pin.encode() self.dongleObject.verifyPin(pin) except BTChipException, e: if (e.sw == 0x6faa): raise Exception("Dongle is temporarily locked - please unplug it and replug it again") if ((e.sw & 0xFFF0) == 0x63c0): raise Exception("Invalid PIN - please unplug the dongle and plug it again before retrying") raise e def checkDevice(self): if not self.preflightDone: self.perform_hw1_preflight() self.preflightDone = True def password_dialog(self, msg=None): response = self.handler.get_word(msg) if response is None: return False, None, None return True, response, response class Ledger_KeyStore(Hardware_KeyStore): hw_type = 'ledger' device = 'Ledger' def __init__(self, d): Hardware_KeyStore.__init__(self, d) # Errors and other user interaction is done through the wallet's # handler. The handler is per-window and preserved across # device reconnects self.force_watching_only = False self.signing = False def get_derivation(self): return self.derivation def get_client(self): return self.plugin.get_client(self) def give_error(self, message, clear_client = False): print_error(message) if not self.signing: self.handler.show_error(message) else: self.signing = False if clear_client: self.client = None raise Exception(message) def address_id_stripped(self, address): # Strip the leading "m/" change, index = self.get_address_index(address) derivation = self.derivation address_path = "%s/%d/%d"%(derivation, change, index) return address_path[2:] def decrypt_message(self, pubkey, message, password): raise RuntimeError(_('Encryption and decryption are currently not supported for %s') % self.device) def sign_message(self, sequence, message, password): self.signing = True # prompt for the PIN before displaying the dialog if necessary client = self.get_client() address_path = self.get_derivation()[2:] + "/%d/%d"%sequence self.handler.show_message("Signing message ...") try: info = self.get_client().signMessagePrepare(address_path, message) pin = "" if info['confirmationNeeded']: # TODO : handle different confirmation types. For the time being only supports keyboard 2FA confirmed, p, pin = self.password_dialog() if not confirmed: raise Exception('Aborted by user') pin = pin.encode() #self.plugin.get_client(self, True, True) signature = self.get_client().signMessageSign(pin) except BTChipException, e: if e.sw == 0x6a80: self.give_error("Unfortunately, this message cannot be signed by the Ledger wallet. Only alphanumerical messages shorter than 140 characters are supported. Please remove any extra characters (tab, carriage return) and retry.") else: self.give_error(e, True) except Exception, e: self.give_error(e, True) finally: self.handler.clear_dialog() self.signing = False # Parse the ASN.1 signature rLength = signature[3] r = signature[4 : 4 + rLength] sLength = signature[4 + rLength + 1] s = signature[4 + rLength + 2:] if rLength == 33: r = r[1:] if sLength == 33: s = s[1:] r = str(r) s = str(s) # And convert it return chr(27 + 4 + (signature[0] & 0x01)) + r + s def sign_transaction(self, tx, password): if tx.is_complete(): return client = self.get_client() self.signing = True inputs = [] inputsPaths = [] pubKeys = [] chipInputs = [] redeemScripts = [] signatures = [] preparedTrustedInputs = [] changePath = "" changeAmount = None output = None outputAmount = None p2shTransaction = False pin = "" self.get_client() # prompt for the PIN before displaying the dialog if necessary # Fetch inputs of the transaction to sign derivations = self.get_tx_derivations(tx) for txin in tx.inputs(): if txin.get('is_coinbase'): self.give_error("Coinbase not supported") # should never happen if len(txin['pubkeys']) > 1: p2shTransaction = True for i, x_pubkey in enumerate(txin['x_pubkeys']): if x_pubkey in derivations: signingPos = i s = derivations.get(x_pubkey) hwAddress = "%s/%d/%d" % (self.get_derivation()[2:], s[0], s[1]) break else: self.give_error("No matching x_key for sign_transaction") # should never happen inputs.append([txin['prev_tx'].raw, txin['prevout_n'], txin.get('redeemScript'), txin['prevout_hash'], signingPos ]) inputsPaths.append(hwAddress) pubKeys.append(txin['pubkeys']) # Sanity check if p2shTransaction: for txinput in tx.inputs(): if len(txinput['pubkeys']) < 2: self.give_error("P2SH / regular input mixed in same transaction not supported") # should never happen txOutput = var_int(len(tx.outputs())) for txout in tx.outputs(): output_type, addr, amount = txout txOutput += int_to_hex(amount, 8) script = tx.pay_script(output_type, addr) txOutput += var_int(len(script)/2) txOutput += script txOutput = txOutput.decode('hex') # Recognize outputs - only one output and one change is authorized if not p2shTransaction: if len(tx.outputs()) > 2: # should never happen self.give_error("Transaction with more than 2 outputs not supported") for i, (_type, address, amount) in enumerate(tx.outputs()): assert _type == TYPE_ADDRESS change, index = tx.output_info[i] if change: changePath = "%s/%d/%d" % (self.get_derivation()[2:], change, index) changeAmount = amount else: output = address outputAmount = amount self.handler.show_message(_("Confirm Transaction on your Ledger device...")) try: # Get trusted inputs from the original transactions for utxo in inputs: if not p2shTransaction: txtmp = bitcoinTransaction(bytearray(utxo[0].decode('hex'))) chipInputs.append(self.get_client().getTrustedInput(txtmp, utxo[1])) redeemScripts.append(txtmp.outputs[utxo[1]].script) else: tmp = utxo[3].decode('hex')[::-1].encode('hex') tmp += int_to_hex(utxo[1], 4) chipInputs.append({'value' : tmp.decode('hex')}) redeemScripts.append(bytearray(utxo[2].decode('hex'))) # Sign all inputs firstTransaction = True inputIndex = 0 rawTx = tx.serialize() while inputIndex < len(inputs): self.get_client().startUntrustedTransaction(firstTransaction, inputIndex, chipInputs, redeemScripts[inputIndex]) if not p2shTransaction: outputData = self.get_client().finalizeInput(output, format_satoshis_plain(outputAmount), format_satoshis_plain(tx.get_fee()), changePath, bytearray(rawTx.decode('hex'))) else: outputData = self.get_client().finalizeInputFull(txOutput) outputData['outputData'] = txOutput if firstTransaction: transactionOutput = outputData['outputData'] if outputData['confirmationNeeded']: # TODO : handle different confirmation types. For the time being only supports keyboard 2FA self.handler.clear_dialog() if 'keycardData' in outputData: pin2 = "" for keycardIndex in range(len(outputData['keycardData'])): msg = "Do not enter your device PIN here !\r\n\r\n" + \ "Your Ledger Wallet wants to talk to you and tell you a unique second factor code.\r\n" + \ "For this to work, please match the character between stars of the output address using your security card\r\n\r\n" + \ "Output address : " for index in range(len(output)): if index == outputData['keycardData'][keycardIndex]: msg = msg + "*" + output[index] + "*" else: msg = msg + output[index] msg = msg + "\r\n" confirmed, p, pin = self.password_dialog(msg) if not confirmed: raise Exception('Aborted by user') try: pin2 = pin2 + chr(int(pin[0], 16)) except: raise Exception('Invalid PIN character') pin = pin2 else: confirmed, p, pin = self.password_dialog() if not confirmed: raise Exception('Aborted by user') pin = pin.encode() #self.plugin.get_client(self, True, True) self.handler.show_message("Signing ...") else: # Sign input with the provided PIN inputSignature = self.get_client().untrustedHashSign(inputsPaths[inputIndex], pin) inputSignature[0] = 0x30 # force for 1.4.9+ signatures.append(inputSignature) inputIndex = inputIndex + 1 firstTransaction = False except BaseException as e: traceback.print_exc(file=sys.stdout) self.give_error(e, True) finally: self.handler.clear_dialog() # Reformat transaction inputIndex = 0 while inputIndex < len(inputs): if p2shTransaction: signaturesPack = [signatures[inputIndex]] * len(pubKeys[inputIndex]) inputScript = get_p2sh_input_script(redeemScripts[inputIndex], signaturesPack) preparedTrustedInputs.append([ ("\x00" * 4) + chipInputs[inputIndex]['value'], inputScript ]) else: inputScript = get_regular_input_script(signatures[inputIndex], pubKeys[inputIndex][0].decode('hex')) preparedTrustedInputs.append([ chipInputs[inputIndex]['value'], inputScript ]) inputIndex = inputIndex + 1 updatedTransaction = format_transaction(transactionOutput, preparedTrustedInputs) updatedTransaction = hexlify(updatedTransaction) tx.update_signatures(updatedTransaction) self.signing = False def password_dialog(self, msg=None): if not msg: msg = _("Do not enter your device PIN here !\r\n\r\n" \ "Your Ledger Wallet wants to talk to you and tell you a unique second factor code.\r\n" \ "For this to work, please open a text editor " \ "(on a different computer / device if you believe this computer is compromised) " \ "and put your cursor into it, unplug your Ledger Wallet and plug it back in.\r\n" \ "It should show itself to your computer as a keyboard " \ "and output the second factor along with a summary of " \ "the transaction being signed into the text-editor.\r\n\r\n" \ "Check that summary and then enter the second factor code here.\r\n" \ "Before clicking OK, re-plug the device once more (unplug it and plug it again if you read the second factor code on the same computer)") response = self.handler.get_word(msg) if response is None: return False, None, None return True, response, response class LedgerPlugin(HW_PluginBase): libraries_available = BTCHIP keystore_class = Ledger_KeyStore client = None DEVICE_IDS = [ (0x2581, 0x1807), # HW.1 legacy btchip (0x2581, 0x2b7c), # HW.1 transitional production (0x2581, 0x3b7c), # HW.1 ledger production (0x2581, 0x4b7c), # HW.1 ledger test (0x2c97, 0x0000), # Blue (0x2c97, 0x0001) # Nano-S ] def __init__(self, parent, config, name): HW_PluginBase.__init__(self, parent, config, name) if self.libraries_available: self.device_manager().register_devices(self.DEVICE_IDS) def btchip_is_connected(self, keystore): try: self.get_client(keystore).getFirmwareVersion() except Exception as e: return False return True def get_btchip_device(self, device): ledger = False if (device.product_key[0] == 0x2581 and device.product_key[1] == 0x3b7c) or (device.product_key[0] == 0x2581 and device.product_key[1] == 0x4b7c) or (device.product_key[0] == 0x2c97): ledger = True dev = hid.device() dev.open_path(device.path) dev.set_nonblocking(True) return HIDDongleHIDAPI(dev, ledger, BTCHIP_DEBUG) def create_client(self, device, handler): self.handler = handler client = self.get_btchip_device(device) if client <> None: client = Ledger_Client(client) return client def setup_device(self, device_info, wizard): devmgr = self.device_manager() device_id = device_info.device.id_ client = devmgr.client_by_id(device_id) #client.handler = wizard client.handler = self.create_handler(wizard) #client.get_xpub('m') client.get_xpub("m/44'/0'") # TODO replace by direct derivation once Nano S > 1.1 def get_xpub(self, device_id, derivation, wizard): devmgr = self.device_manager() client = devmgr.client_by_id(device_id) #client.handler = wizard client.handler = self.create_handler(wizard) client.checkDevice() xpub = client.get_xpub(derivation) return xpub def get_client(self, keystore, force_pair=True): # All client interaction should not be in the main GUI thread #assert self.main_thread != threading.current_thread() devmgr = self.device_manager() handler = keystore.handler handler = keystore.handler with devmgr.hid_lock: client = devmgr.client_for_keystore(self, handler, keystore, force_pair) # returns the client for a given keystore. can use xpub #if client: # client.used() if client <> None: client.checkDevice() client = client.dongleObject return client