committed by
GitHub
13 changed files with 1064 additions and 2 deletions
@ -0,0 +1,65 @@ |
|||
|
|||
# Coldcard Hardware Wallet Plugin |
|||
|
|||
## Just the glue please |
|||
|
|||
This code connects the public USB API and Electrum. Leverages all |
|||
the good work that's been done by the Electrum team to support |
|||
hardware wallets. |
|||
|
|||
## Background |
|||
|
|||
The Coldcard has a larger screen (128x64) and a number pad. For |
|||
this reason, all PIN code entry is done directly on the device. |
|||
Coldcard does not appear on the USB bus until unlocked with appropriate |
|||
PIN. Initial setup, and seed generation must be done offline. |
|||
|
|||
Coldcard uses an emerging standard for unsigned tranasctions: |
|||
|
|||
PSBT = Partially Signed Bitcoin Transaction = BIP174 |
|||
|
|||
However, this spec is still under heavy discussion and in flux. At |
|||
this point, the PSBT files generated will only be compatible with |
|||
Coldcard. |
|||
|
|||
The Coldcard can be used 100% offline: it can generate a skeleton |
|||
Electrum wallet and save it to MicroSD card. Transport that file |
|||
to Electrum and it will fetch history, blockchain details and then |
|||
operate in "unpaired" mode. |
|||
|
|||
Spending transactions can be saved to MicroSD using the "Export PSBT" |
|||
button on the transaction preview dialog (when this plugin is |
|||
owner of the wallet). That PSBT can be signed on the Coldcard |
|||
(again using MicroSD both ways). The result is a ready-to-transmit |
|||
bitcoin transaction, which can be transmitted using Tools > Load |
|||
Transaction > From File in Electrum or really any tool. |
|||
|
|||
<https://coldcardwallet.com> |
|||
|
|||
## TODO Items |
|||
|
|||
- No effort yet to support translations or languages other than English, sorry. |
|||
- Coldcard PSBT format is not likely to be compatible with other devices, because the BIP174 is still in flux. |
|||
- Segwit support not 100% complete: can pay to them, but cannot setup wallet to receive them. |
|||
- Limited support for segwit wrapped in P2SH. |
|||
- Someday we could support multisig hardware wallets based on PSBT where each participant |
|||
is using different devices/systems for signing, however, that belongs in an independant |
|||
plugin that is PSBT focused and might not require a Coldcard to be present. |
|||
|
|||
### Ctags |
|||
|
|||
- I find this command useful (at top level) ... but I'm a VIM user. |
|||
|
|||
ctags -f .tags electrum `find . -name ENV -prune -o -name \*.py` |
|||
|
|||
|
|||
### Working with latest ckcc-protocol |
|||
|
|||
- at top level, do this: |
|||
|
|||
pip install -e git+ssh://git@github.com/Coldcard/ckcc-protocol.git#egg=ckcc-protocol |
|||
|
|||
- but you'll need the https version of that, not ssh like I can. |
|||
- also a branch name would be good in there |
|||
- do `pip uninstall ckcc` first |
|||
- see <https://stackoverflow.com/questions/4830856> |
@ -0,0 +1,7 @@ |
|||
from electrum.i18n import _ |
|||
|
|||
fullname = 'Coldcard Wallet' |
|||
description = 'Provides support for the Coldcard hardware wallet from Coinkite' |
|||
requires = [('ckcc-protocol', 'github.com/Coldcard/ckcc-protocol')] |
|||
registers_keystore = ('hardware', 'coldcard', _("Coldcard Wallet")) |
|||
available_for = ['qt', 'cmdline'] |
@ -0,0 +1,47 @@ |
|||
from electrum.plugin import hook |
|||
from .coldcard import ColdcardPlugin |
|||
from electrum.util import print_msg, print_error, raw_input, print_stderr |
|||
|
|||
class ColdcardCmdLineHandler: |
|||
|
|||
def get_passphrase(self, msg, confirm): |
|||
raise NotImplementedError |
|||
|
|||
def get_pin(self, msg): |
|||
raise NotImplementedError |
|||
|
|||
def prompt_auth(self, msg): |
|||
raise NotImplementedError |
|||
|
|||
def yes_no_question(self, msg): |
|||
print_msg(msg) |
|||
return raw_input() in 'yY' |
|||
|
|||
def stop(self): |
|||
pass |
|||
|
|||
def show_message(self, msg, on_cancel=None): |
|||
print_stderr(msg) |
|||
|
|||
def show_error(self, msg, blocking=False): |
|||
print_error(msg) |
|||
|
|||
def update_status(self, b): |
|||
print_error('hw device status', b) |
|||
|
|||
def finished(self): |
|||
pass |
|||
|
|||
class Plugin(ColdcardPlugin): |
|||
handler = ColdcardCmdLineHandler() |
|||
|
|||
@hook |
|||
def init_keystore(self, keystore): |
|||
if not isinstance(keystore, self.keystore_class): |
|||
return |
|||
keystore.handler = self.handler |
|||
|
|||
def create_handler(self, window): |
|||
return self.handler |
|||
|
|||
# EOF |
@ -0,0 +1,684 @@ |
|||
# |
|||
# Coldcard Electrum plugin main code. |
|||
# |
|||
# |
|||
from struct import pack, unpack |
|||
import hashlib |
|||
import os, sys, time, io |
|||
import traceback |
|||
|
|||
from electrum import bitcoin |
|||
from electrum.bitcoin import serialize_xpub, deserialize_xpub, InvalidMasterKeyVersionBytes |
|||
from electrum import constants |
|||
from electrum.bitcoin import TYPE_ADDRESS, int_to_hex |
|||
from electrum.i18n import _ |
|||
from electrum.plugin import BasePlugin, Device |
|||
from electrum.keystore import Hardware_KeyStore, xpubkey_to_pubkey, Xpub |
|||
from electrum.transaction import Transaction |
|||
from electrum.wallet import Standard_Wallet |
|||
from electrum.crypto import hash_160 |
|||
from ..hw_wallet import HW_PluginBase |
|||
from ..hw_wallet.plugin import is_any_tx_output_on_change_branch |
|||
from electrum.util import print_error, bfh, bh2u, versiontuple |
|||
from electrum.base_wizard import ScriptTypeNotSupported |
|||
|
|||
try: |
|||
import hid |
|||
from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker |
|||
from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError |
|||
from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN, |
|||
AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH) |
|||
from ckcc.constants import ( |
|||
PSBT_GLOBAL_UNSIGNED_TX, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO, |
|||
PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, |
|||
PSBT_IN_BIP32_DERIVATION, PSBT_OUT_BIP32_DERIVATION) |
|||
|
|||
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH |
|||
|
|||
requirements_ok = True |
|||
|
|||
|
|||
class ElectrumColdcardDevice(ColdcardDevice): |
|||
# avoid use of pycoin for MiTM message signature test |
|||
def mitm_verify(self, sig, expect_xpub): |
|||
# verify a signature (65 bytes) over the session key, using the master bip32 node |
|||
# - customized to use specific EC library of Electrum. |
|||
from electrum.ecc import ECPubkey |
|||
|
|||
xtype, depth, parent_fingerprint, child_number, chain_code, K_or_k \ |
|||
= bitcoin.deserialize_xpub(expect_xpub) |
|||
|
|||
pubkey = ECPubkey(K_or_k) |
|||
try: |
|||
pubkey.verify_message_hash(sig[1:65], self.session_key) |
|||
return True |
|||
except: |
|||
return False |
|||
|
|||
except ImportError: |
|||
requirements_ok = False |
|||
|
|||
COINKITE_VID = 0xd13e |
|||
CKCC_PID = 0xcc10 |
|||
|
|||
CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa |
|||
|
|||
def my_var_int(l): |
|||
# Bitcoin serialization of integers... directly into binary! |
|||
if l < 253: |
|||
return pack("B", l) |
|||
elif l < 0x10000: |
|||
return pack("<BH", 253, l) |
|||
elif l < 0x100000000: |
|||
return pack("<BI", 254, l) |
|||
else: |
|||
return pack("<BQ", 255, l) |
|||
|
|||
def xfp_from_xpub(xpub): |
|||
# sometime we need to BIP32 fingerprint value: 4 bytes of ripemd(sha256(pubkey)) |
|||
# UNTESTED |
|||
kk = bfh(Xpub.get_pubkey_from_xpub(xpub, [])) |
|||
assert len(kk) == 33 |
|||
xfp, = unpack('<I', hash_160(kk)[0:4]) |
|||
return xfp |
|||
|
|||
|
|||
class CKCCClient: |
|||
# Challenge: I haven't found anywhere that defines a base class for this 'client', |
|||
# nor an API (interface) to be met. Winging it. Gets called from lib/plugins.py mostly? |
|||
|
|||
def __init__(self, plugin, handler, dev_path, is_simulator=False): |
|||
self.device = plugin.device |
|||
self.handler = handler |
|||
|
|||
# if we know what the (xfp, xpub) "should be" then track it here |
|||
self._expected_device = None |
|||
|
|||
if is_simulator: |
|||
self.dev = ElectrumColdcardDevice(dev_path, encrypt=True) |
|||
else: |
|||
# open the real HID device |
|||
import hid |
|||
hd = hid.device(path=dev_path) |
|||
hd.open_path(dev_path) |
|||
|
|||
self.dev = ElectrumColdcardDevice(dev=hd, encrypt=True) |
|||
|
|||
# NOTE: MiTM test is delayed until we have a hint as to what XPUB we |
|||
# should expect. It's also kinda slow. |
|||
|
|||
def __repr__(self): |
|||
return '<CKCCClient: xfp=%08x label=%r>' % (self.dev.master_fingerprint, |
|||
self.label()) |
|||
|
|||
def verify_connection(self, expected_xfp, expected_xpub): |
|||
ex = (expected_xfp, expected_xpub) |
|||
|
|||
if self._expected_device == ex: |
|||
# all is as expected |
|||
return |
|||
|
|||
if ( (self._expected_device is not None) |
|||
or (self.dev.master_fingerprint != expected_xfp) |
|||
or (self.dev.master_xpub != expected_xpub)): |
|||
# probably indicating programing error, not hacking |
|||
raise RuntimeError("Expecting 0x%08x but that's not whats connected?!" % |
|||
expected_xfp) |
|||
|
|||
# check signature over session key |
|||
# - mitm might have lied about xfp and xpub up to here |
|||
# - important that we use value capture at wallet creation time, not some value |
|||
# we read over USB today |
|||
self.dev.check_mitm(expected_xpub=expected_xpub) |
|||
|
|||
self._expected_device = ex |
|||
|
|||
print_error("[coldcard]", "Successfully verified against MiTM") |
|||
|
|||
def is_pairable(self): |
|||
# can't do anything w/ devices that aren't setup (but not normally reachable) |
|||
return bool(self.dev.master_xpub) |
|||
|
|||
def timeout(self, cutoff): |
|||
# nothing to do? |
|||
pass |
|||
|
|||
def close(self): |
|||
# close the HID device (so can be reused) |
|||
self.dev.close() |
|||
self.dev = None |
|||
|
|||
def is_initialized(self): |
|||
return bool(self.dev.master_xpub) |
|||
|
|||
def label(self): |
|||
# 'label' of this Coldcard. Warning: gets saved into wallet file, which might |
|||
# not be encrypted, so better for privacy if based on xpub/fingerprint rather than |
|||
# USB serial number. |
|||
if self.dev.is_simulator: |
|||
lab = 'Coldcard Simulator 0x%08x' % self.dev.master_fingerprint |
|||
elif not self.dev.master_fingerprint: |
|||
# failback; not expected |
|||
lab = 'Coldcard #' + self.dev.serial |
|||
else: |
|||
lab = 'Coldcard 0x%08x' % self.dev.master_fingerprint |
|||
|
|||
# Hack zone: during initial setup I need the xfp and master xpub but |
|||
# very few objects are passed between the various steps of base_wizard. |
|||
# Solution: return a string with some hidden metadata |
|||
# - see <https://stackoverflow.com/questions/7172772/abc-for-string> |
|||
# - needs to work w/ deepcopy |
|||
class LabelStr(str): |
|||
def __new__(cls, s, xfp=None, xpub=None): |
|||
self = super().__new__(cls, str(s)) |
|||
self.xfp = getattr(s, 'xfp', xfp) |
|||
self.xpub = getattr(s, 'xpub', xpub) |
|||
return self |
|||
|
|||
return LabelStr(lab, self.dev.master_fingerprint, self.dev.master_xpub) |
|||
|
|||
def has_usable_connection_with_device(self): |
|||
# Do end-to-end ping test |
|||
try: |
|||
self.ping_check() |
|||
return True |
|||
except: |
|||
return False |
|||
|
|||
def get_xpub(self, bip32_path, xtype): |
|||
assert xtype in ColdcardPlugin.SUPPORTED_XTYPES |
|||
print_error('[coldcard]', 'Derive xtype = %r' % xtype) |
|||
xpub = self.dev.send_recv(CCProtocolPacker.get_xpub(bip32_path), timeout=5000) |
|||
# TODO handle timeout? |
|||
# change type of xpub to the requested type |
|||
try: |
|||
__, depth, fingerprint, child_number, c, cK = deserialize_xpub(xpub) |
|||
except InvalidMasterKeyVersionBytes: |
|||
raise Exception(_('Invalid xpub magic. Make sure your {} device is set to the correct chain.') |
|||
.format(self.device)) from None |
|||
if xtype != 'standard': |
|||
xpub = serialize_xpub(xtype, c, cK, depth, fingerprint, child_number) |
|||
return xpub |
|||
|
|||
def ping_check(self): |
|||
# check connection is working |
|||
assert self.dev.session_key, 'not encrypted?' |
|||
req = b'1234 Electrum Plugin 4321' # free up to 59 bytes |
|||
try: |
|||
echo = self.dev.send_recv(CCProtocolPacker.ping(req)) |
|||
assert echo == req |
|||
except: |
|||
raise RuntimeError("Communication trouble with Coldcard") |
|||
|
|||
def show_address(self, path, addr_fmt): |
|||
# prompt user w/ addres, also returns it immediately. |
|||
return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) |
|||
|
|||
def get_version(self): |
|||
# gives list of strings |
|||
return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n') |
|||
|
|||
def sign_message_start(self, path, msg): |
|||
# this starts the UX experience. |
|||
self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None) |
|||
|
|||
def sign_message_poll(self): |
|||
# poll device... if user has approved, will get tuple: (addr, sig) else None |
|||
return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) |
|||
|
|||
def sign_transaction_start(self, raw_psbt, finalize=True): |
|||
# Multiple steps to sign: |
|||
# - upload binary |
|||
# - start signing UX |
|||
# - wait for coldcard to complete process, or have it refused. |
|||
# - download resulting txn |
|||
assert 20 <= len(raw_psbt) < MAX_TXN_LEN, 'PSBT is too big' |
|||
dlen, chk = self.dev.upload_file(raw_psbt) |
|||
|
|||
resp = self.dev.send_recv(CCProtocolPacker.sign_transaction(dlen, chk, finalize=finalize), |
|||
timeout=None) |
|||
|
|||
if resp != None: |
|||
raise ValueError(resp) |
|||
|
|||
def sign_transaction_poll(self): |
|||
# poll device... if user has approved, will get tuple: (legnth, checksum) else None |
|||
return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) |
|||
|
|||
def download_file(self, length, checksum, file_number=1): |
|||
# get a file |
|||
return self.dev.download_file(length, checksum, file_number=file_number) |
|||
|
|||
|
|||
|
|||
class Coldcard_KeyStore(Hardware_KeyStore): |
|||
hw_type = 'coldcard' |
|||
device = 'Coldcard' |
|||
|
|||
def __init__(self, d): |
|||
Hardware_KeyStore.__init__(self, d) |
|||
# Errors and other user interaction is done through the wallet's |
|||
# handler. The handler is per-window and preserved across |
|||
# device reconnects |
|||
self.force_watching_only = False |
|||
self.ux_busy = False |
|||
|
|||
# Seems like only the derivation path and resulting **derived** xpub is stored in |
|||
# the wallet file... however, we need to know at least the fingerprint of the master |
|||
# xpub to verify against MiTM, and also so we can put the right value into the subkey paths |
|||
# of PSBT files that might be generated offline. |
|||
# - save the fingerprint of the master xpub, as "xfp" |
|||
# - it's a LE32 int, but hex more natural way to see it |
|||
# - device reports these value during encryption setup process |
|||
lab = d['label'] |
|||
if hasattr(lab, 'xfp'): |
|||
# initial setup |
|||
self.ckcc_xfp = lab.xfp |
|||
self.ckcc_xpub = lab.xpub |
|||
else: |
|||
# wallet load: fatal if missing, we need them! |
|||
self.ckcc_xfp = d['ckcc_xfp'] |
|||
self.ckcc_xpub = d['ckcc_xpub'] |
|||
|
|||
def dump(self): |
|||
# our additions to the stored data about keystore -- only during creation? |
|||
d = Hardware_KeyStore.dump(self) |
|||
|
|||
d['ckcc_xfp'] = self.ckcc_xfp |
|||
d['ckcc_xpub'] = self.ckcc_xpub |
|||
|
|||
return d |
|||
|
|||
def get_derivation(self): |
|||
return self.derivation |
|||
|
|||
def get_client(self): |
|||
# called when user tries to do something like view address, sign somthing. |
|||
# - not called during probing/setup |
|||
rv = self.plugin.get_client(self) |
|||
if rv: |
|||
rv.verify_connection(self.ckcc_xfp, self.ckcc_xpub) |
|||
|
|||
return rv |
|||
|
|||
def give_error(self, message, clear_client=False): |
|||
print_error(message) |
|||
if not self.ux_busy: |
|||
self.handler.show_error(message) |
|||
else: |
|||
self.ux_busy = False |
|||
if clear_client: |
|||
self.client = None |
|||
raise Exception(message) |
|||
|
|||
def wrap_busy(func): |
|||
# decorator: function takes over the UX on the device. |
|||
def wrapper(self, *args, **kwargs): |
|||
try: |
|||
self.ux_busy = True |
|||
return func(self, *args, **kwargs) |
|||
finally: |
|||
self.ux_busy = False |
|||
return wrapper |
|||
|
|||
def decrypt_message(self, pubkey, message, password): |
|||
raise RuntimeError(_('Encryption and decryption are currently not supported for {}').format(self.device)) |
|||
|
|||
@wrap_busy |
|||
def sign_message(self, sequence, message, password): |
|||
# Sign a message on device. Since we have big screen, of course we |
|||
# have to show the message unabiguously there first! |
|||
try: |
|||
msg = message.encode('ascii', errors='strict') |
|||
assert 1 <= len(msg) <= MSG_SIGNING_MAX_LENGTH |
|||
except (UnicodeError, AssertionError): |
|||
# there are other restrictions on message content, |
|||
# but let the device enforce and report those |
|||
self.handler.show_error('Only short (%d max) ASCII messages can be signed.' |
|||
% MSG_SIGNING_MAX_LENGTH) |
|||
return b'' |
|||
|
|||
client = self.get_client() |
|||
path = self.get_derivation() + ("/%d/%d" % sequence) |
|||
try: |
|||
cl = self.get_client() |
|||
try: |
|||
self.handler.show_message("Signing message (using %s)..." % path) |
|||
|
|||
cl.sign_message_start(path, msg) |
|||
|
|||
while 1: |
|||
# How to kill some time, without locking UI? |
|||
time.sleep(0.250) |
|||
|
|||
resp = cl.sign_message_poll() |
|||
if resp is not None: |
|||
break |
|||
|
|||
finally: |
|||
self.handler.finished() |
|||
|
|||
assert len(resp) == 2 |
|||
addr, raw_sig = resp |
|||
|
|||
# already encoded in Bitcoin fashion, binary. |
|||
assert 40 < len(raw_sig) <= 65 |
|||
|
|||
return raw_sig |
|||
|
|||
except (CCUserRefused, CCBusyError) as exc: |
|||
self.handler.show_error(str(exc)) |
|||
except CCProtoError as exc: |
|||
traceback.print_exc(file=sys.stderr) |
|||
self.handler.show_error('{}\n\n{}'.format( |
|||
_('Error showing address') + ':', str(exc))) |
|||
except Exception as e: |
|||
self.give_error(e, True) |
|||
|
|||
# give empty bytes for error cases; it seems to clear the old signature box |
|||
return b'' |
|||
|
|||
def build_psbt(self, tx, wallet=None, xfp=None): |
|||
# Render a PSBT file, for upload to Coldcard. |
|||
# |
|||
if xfp is None: |
|||
# need fingerprint of MASTER xpub, not the derived key |
|||
xfp = self.ckcc_xfp |
|||
|
|||
inputs = tx.inputs() |
|||
|
|||
if 'prev_tx' not in inputs[0]: |
|||
# fetch info about inputs, if needed? |
|||
# - needed during export PSBT flow, not normal online signing |
|||
assert wallet, 'need wallet reference' |
|||
wallet.add_hw_info(tx) |
|||
|
|||
# wallet.add_hw_info installs this attr |
|||
assert hasattr(tx, 'output_info'), 'need data about outputs' |
|||
|
|||
# Build map of pubkey needed as derivation from master, in PSBT binary format |
|||
# 1) binary version of the common subpath for all keys |
|||
# m/ => fingerprint LE32 |
|||
# a/b/c => ints |
|||
base_path = pack('<I', xfp) |
|||
for x in self.get_derivation()[2:].split('/'): |
|||
if x.endswith("'"): |
|||
x = int(x[:-1]) | 0x80000000 |
|||
else: |
|||
x = int(x) |
|||
base_path += pack('<I', x) |
|||
|
|||
# 2) all used keys in transaction |
|||
subkeys = {} |
|||
derivations = self.get_tx_derivations(tx) |
|||
for xpubkey in derivations: |
|||
pubkey = xpubkey_to_pubkey(xpubkey) |
|||
|
|||
# assuming depth two, non-harded: change + index |
|||
aa, bb = derivations[xpubkey] |
|||
assert 0 <= aa < 0x80000000 |
|||
assert 0 <= bb < 0x80000000 |
|||
|
|||
subkeys[bfh(pubkey)] = base_path + pack('<II', aa, bb) |
|||
|
|||
for txin in inputs: |
|||
if txin['type'] == 'coinbase': |
|||
self.give_error("Coinbase not supported") # but why not? |
|||
|
|||
if txin['type'] in ['p2sh']: |
|||
self.give_error('Not ready for multisig transactions yet') |
|||
|
|||
#if txin['type'] in ['p2wpkh-p2sh', 'p2wsh-p2sh']: |
|||
#if txin['type'] in ['p2wpkh', 'p2wsh']: |
|||
|
|||
# Construct PSBT from start to finish. |
|||
out_fd = io.BytesIO() |
|||
out_fd.write(b'psbt\xff') |
|||
|
|||
def write_kv(ktype, val, key=b''): |
|||
# serialize helper: write w/ size and key byte |
|||
out_fd.write(my_var_int(1 + len(key))) |
|||
out_fd.write(bytes([ktype]) + key) |
|||
|
|||
if isinstance(val, str): |
|||
val = bfh(val) |
|||
|
|||
out_fd.write(my_var_int(len(val))) |
|||
out_fd.write(val) |
|||
|
|||
|
|||
# global section: just the unsigned txn |
|||
class CustomTXSerialization(Transaction): |
|||
@classmethod |
|||
def input_script(cls, txin, estimate_size=False): |
|||
return '' |
|||
unsigned = bfh(CustomTXSerialization(tx.serialize()).serialize_to_network(witness=False)) |
|||
write_kv(PSBT_GLOBAL_UNSIGNED_TX, unsigned) |
|||
|
|||
# end globals section |
|||
out_fd.write(b'\x00') |
|||
|
|||
# inputs section |
|||
for txin in inputs: |
|||
utxo = txin['prev_tx'].outputs()[txin['prevout_n']] |
|||
spendable = txin['prev_tx'].serialize_output(utxo) |
|||
write_kv(PSBT_IN_WITNESS_UTXO, spendable) |
|||
|
|||
pubkeys, x_pubkeys = tx.get_sorted_pubkeys(txin) |
|||
|
|||
pubkeys = [bfh(k) for k in pubkeys] |
|||
|
|||
for k in pubkeys: |
|||
write_kv(PSBT_IN_BIP32_DERIVATION, subkeys[k], k) |
|||
|
|||
out_fd.write(b'\x00') |
|||
|
|||
# outputs section |
|||
for o in tx.outputs(): |
|||
# can be empty, but must be present, and helpful to show change inputs |
|||
# wallet.add_hw_info() adds some data about change outputs into tx.output_info |
|||
if o.address in tx.output_info: |
|||
# this address "is_mine" but might not be change (I like to sent to myself) |
|||
output_info = tx.output_info.get(o.address) |
|||
index, xpubs = output_info.address_index, output_info.sorted_xpubs |
|||
|
|||
if index[0] == 1 and len(index) == 2: |
|||
# it is a change output (based on our standard derivation path) |
|||
assert len(xpubs) == 1 # not expecting multisig |
|||
xpubkey = xpubs[0] |
|||
|
|||
# document its bip32 derivation in output section |
|||
aa, bb = index |
|||
assert 0 <= aa < 0x80000000 |
|||
assert 0 <= bb < 0x80000000 |
|||
|
|||
deriv = base_path + pack('<II', aa, bb) |
|||
pubkey = self.get_pubkey_from_xpub(xpubkey, index) |
|||
|
|||
write_kv(PSBT_OUT_BIP32_DERIVATION, deriv, bfh(pubkey)) |
|||
|
|||
out_fd.write(b'\x00') |
|||
|
|||
return out_fd.getvalue() |
|||
|
|||
|
|||
@wrap_busy |
|||
def sign_transaction(self, tx, password): |
|||
# Build a PSBT in memory, upload it for signing. |
|||
# - we can also work offline (without paired device present) |
|||
if tx.is_complete(): |
|||
return |
|||
|
|||
client = self.get_client() |
|||
|
|||
assert client.dev.master_fingerprint == self.ckcc_xfp |
|||
|
|||
raw_psbt = self.build_psbt(tx) |
|||
|
|||
#open('debug.psbt', 'wb').write(out_fd.getvalue()) |
|||
|
|||
try: |
|||
try: |
|||
self.handler.show_message("Authorize Transaction...") |
|||
|
|||
client.sign_transaction_start(raw_psbt, True) |
|||
|
|||
while 1: |
|||
# How to kill some time, without locking UI? |
|||
time.sleep(0.250) |
|||
|
|||
resp = client.sign_transaction_poll() |
|||
if resp is not None: |
|||
break |
|||
|
|||
rlen, rsha = resp |
|||
|
|||
# download the resulting txn. |
|||
new_raw = client.download_file(rlen, rsha) |
|||
|
|||
finally: |
|||
self.handler.finished() |
|||
|
|||
except (CCUserRefused, CCBusyError) as exc: |
|||
print_error('[coldcard]', 'Did not sign:', str(exc)) |
|||
self.handler.show_error(str(exc)) |
|||
return |
|||
except BaseException as e: |
|||
traceback.print_exc(file=sys.stderr) |
|||
self.give_error(e, True) |
|||
return |
|||
|
|||
# trust the coldcard to re-searilize final product right? |
|||
tx.update(bh2u(new_raw)) |
|||
|
|||
@staticmethod |
|||
def _encode_txin_type(txin_type): |
|||
# Map from Electrum code names to our code numbers. |
|||
return {'standard': AF_CLASSIC, 'p2pkh': AF_CLASSIC, |
|||
'p2sh': AF_P2SH, |
|||
'p2wpkh-p2sh': AF_P2WPKH_P2SH, |
|||
'p2wpkh': AF_P2WPKH, |
|||
'p2wsh-p2sh': AF_P2WSH_P2SH, |
|||
'p2wsh': AF_P2WSH, |
|||
}[txin_type] |
|||
|
|||
@wrap_busy |
|||
def show_address(self, sequence, txin_type): |
|||
client = self.get_client() |
|||
address_path = self.get_derivation()[2:] + "/%d/%d"%sequence |
|||
addr_fmt = self._encode_txin_type(txin_type) |
|||
try: |
|||
try: |
|||
self.handler.show_message(_("Showing address ...")) |
|||
dev_addr = client.show_address(address_path, addr_fmt) |
|||
# we could double check address here |
|||
finally: |
|||
self.handler.finished() |
|||
except CCProtoError as exc: |
|||
traceback.print_exc(file=sys.stderr) |
|||
self.handler.show_error('{}\n\n{}'.format( |
|||
_('Error showing address') + ':', str(exc))) |
|||
except BaseException as exc: |
|||
traceback.print_exc(file=sys.stderr) |
|||
self.handler.show_error(exc) |
|||
|
|||
|
|||
|
|||
class ColdcardPlugin(HW_PluginBase): |
|||
libraries_available = requirements_ok |
|||
keystore_class = Coldcard_KeyStore |
|||
client = None |
|||
|
|||
DEVICE_IDS = [ |
|||
(COINKITE_VID, CKCC_PID), |
|||
(COINKITE_VID, CKCC_SIMULATED_PID) |
|||
] |
|||
|
|||
#SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') |
|||
SUPPORTED_XTYPES = ('standard', 'p2wpkh') |
|||
|
|||
def __init__(self, parent, config, name): |
|||
HW_PluginBase.__init__(self, parent, config, name) |
|||
|
|||
if self.libraries_available: |
|||
self.device_manager().register_devices(self.DEVICE_IDS) |
|||
|
|||
self.device_manager().register_enumerate_func(self.detect_simulator) |
|||
|
|||
def detect_simulator(self): |
|||
# if there is a simulator running on this machine, |
|||
# return details about it so it's offered as a pairing choice |
|||
fn = CKCC_SIMULATOR_PATH |
|||
|
|||
if os.path.exists(fn): |
|||
return [Device(fn, -1, fn, (COINKITE_VID, CKCC_SIMULATED_PID), 0)] |
|||
|
|||
return [] |
|||
|
|||
|
|||
def create_client(self, device, handler): |
|||
if handler: |
|||
self.handler = handler |
|||
|
|||
# We are given a HID device, or at least some details about it. |
|||
# Not sure why not we aren't just given a HID library handle, but |
|||
# the 'path' is unabiguous, so we'll use that. |
|||
try: |
|||
rv = CKCCClient(self, handler, device.path, |
|||
is_simulator=(device.product_key[1] == CKCC_SIMULATED_PID)) |
|||
return rv |
|||
except: |
|||
self.print_error('late failure connecting to device?') |
|||
return None |
|||
|
|||
def setup_device(self, device_info, wizard, purpose): |
|||
devmgr = self.device_manager() |
|||
device_id = device_info.device.id_ |
|||
client = devmgr.client_by_id(device_id) |
|||
if client is None: |
|||
raise Exception(_('Failed to create a client for this device.') + '\n' + |
|||
_('Make sure it is in the correct state.')) |
|||
client.handler = self.create_handler(wizard) |
|||
|
|||
def get_xpub(self, device_id, derivation, xtype, wizard): |
|||
# this seems to be part of the pairing process only, not during normal ops? |
|||
# base_wizard:on_hw_derivation |
|||
if xtype not in self.SUPPORTED_XTYPES: |
|||
raise ScriptTypeNotSupported(_('This type of script is not supported with {}.').format(self.device)) |
|||
devmgr = self.device_manager() |
|||
client = devmgr.client_by_id(device_id) |
|||
client.handler = self.create_handler(wizard) |
|||
client.ping_check() |
|||
|
|||
xpub = client.get_xpub(derivation, xtype) |
|||
return xpub |
|||
|
|||
def get_client(self, keystore, force_pair=True): |
|||
# All client interaction should not be in the main GUI thread |
|||
devmgr = self.device_manager() |
|||
handler = keystore.handler |
|||
with devmgr.hid_lock: |
|||
client = devmgr.client_for_keystore(self, handler, keystore, force_pair) |
|||
# returns the client for a given keystore. can use xpub |
|||
#if client: |
|||
# client.used() |
|||
if client is not None: |
|||
client.ping_check() |
|||
return client |
|||
|
|||
def show_address(self, wallet, address, keystore=None): |
|||
if keystore is None: |
|||
keystore = wallet.get_keystore() |
|||
if not self.show_address_helper(wallet, address, keystore): |
|||
return |
|||
|
|||
# Standard_Wallet => not multisig, must be bip32 |
|||
if type(wallet) is not Standard_Wallet: |
|||
keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device)) |
|||
return |
|||
|
|||
sequence = wallet.get_address_index(address) |
|||
txin_type = wallet.get_txin_type(address) |
|||
keystore.show_address(sequence, txin_type) |
|||
|
|||
# EOF |
@ -0,0 +1,242 @@ |
|||
import time |
|||
|
|||
from electrum.i18n import _ |
|||
from electrum.plugin import hook |
|||
from electrum.wallet import Standard_Wallet |
|||
from electrum.gui.qt.util import * |
|||
|
|||
from .coldcard import ColdcardPlugin |
|||
from ..hw_wallet.qt import QtHandlerBase, QtPluginBase |
|||
|
|||
|
|||
class Plugin(ColdcardPlugin, QtPluginBase): |
|||
icon_unpaired = ":icons/coldcard_unpaired.png" |
|||
icon_paired = ":icons/coldcard.png" |
|||
|
|||
def create_handler(self, window): |
|||
return Coldcard_Handler(window) |
|||
|
|||
@hook |
|||
def receive_menu(self, menu, addrs, wallet): |
|||
if type(wallet) is not Standard_Wallet: |
|||
return |
|||
keystore = wallet.get_keystore() |
|||
if type(keystore) == self.keystore_class and len(addrs) == 1: |
|||
def show_address(): |
|||
keystore.thread.add(partial(self.show_address, wallet, addrs[0])) |
|||
menu.addAction(_("Show on Coldcard"), show_address) |
|||
|
|||
@hook |
|||
def transaction_dialog(self, dia): |
|||
# see gui/qt/transaction_dialog.py |
|||
|
|||
keystore = dia.wallet.get_keystore() |
|||
if type(keystore) != self.keystore_class: |
|||
# not a Coldcard wallet, hide feature |
|||
return |
|||
|
|||
# - add a new button, near "export" |
|||
btn = QPushButton(_("Save PSBT")) |
|||
btn.clicked.connect(lambda unused: self.export_psbt(dia)) |
|||
if dia.tx.is_complete(): |
|||
# but disable it for signed transactions (nothing to do if already signed) |
|||
btn.setDisabled(True) |
|||
|
|||
dia.sharing_buttons.append(btn) |
|||
|
|||
def export_psbt(self, dia): |
|||
# Called from hook in transaction dialog |
|||
tx = dia.tx |
|||
|
|||
if tx.is_complete(): |
|||
# if they sign while dialog is open, it can transition from unsigned to signed, |
|||
# which we don't support here, so do nothing |
|||
return |
|||
|
|||
# can only expect Coldcard wallets to work with these files (right now) |
|||
keystore = dia.wallet.get_keystore() |
|||
assert type(keystore) == self.keystore_class |
|||
|
|||
# convert to PSBT |
|||
raw_psbt = keystore.build_psbt(tx, wallet=dia.wallet) |
|||
|
|||
name = (dia.wallet.basename() + time.strftime('-%y%m%d-%H%M.psbt')).replace(' ', '-') |
|||
fileName = dia.main_window.getSaveFileName(_("Select where to save the PSBT file"), |
|||
name, "*.psbt") |
|||
if fileName: |
|||
with open(fileName, "wb+") as f: |
|||
f.write(raw_psbt) |
|||
dia.show_message(_("Transaction exported successfully")) |
|||
dia.saved = True |
|||
|
|||
def show_settings_dialog(self, window, keystore): |
|||
# When they click on the icon for CC we come here. |
|||
device_id = self.choose_device(window, keystore) |
|||
if device_id: |
|||
CKCCSettingsDialog(window, self, keystore, device_id).exec_() |
|||
|
|||
|
|||
class Coldcard_Handler(QtHandlerBase): |
|||
setup_signal = pyqtSignal() |
|||
#auth_signal = pyqtSignal(object) |
|||
|
|||
def __init__(self, win): |
|||
super(Coldcard_Handler, self).__init__(win, 'Coldcard') |
|||
self.setup_signal.connect(self.setup_dialog) |
|||
#self.auth_signal.connect(self.auth_dialog) |
|||
|
|||
|
|||
def message_dialog(self, msg): |
|||
self.clear_dialog() |
|||
self.dialog = dialog = WindowModalDialog(self.top_level_window(), _("Coldcard Status")) |
|||
l = QLabel(msg) |
|||
vbox = QVBoxLayout(dialog) |
|||
vbox.addWidget(l) |
|||
dialog.show() |
|||
|
|||
def get_setup(self): |
|||
self.done.clear() |
|||
self.setup_signal.emit() |
|||
self.done.wait() |
|||
return |
|||
|
|||
def setup_dialog(self): |
|||
self.show_error(_('Please initialization your Coldcard while disconnected.')) |
|||
return |
|||
|
|||
class CKCCSettingsDialog(WindowModalDialog): |
|||
'''This dialog doesn't require a device be paired with a wallet. |
|||
We want users to be able to wipe a device even if they've forgotten |
|||
their PIN.''' |
|||
|
|||
def __init__(self, window, plugin, keystore, device_id): |
|||
title = _("{} Settings").format(plugin.device) |
|||
super(CKCCSettingsDialog, self).__init__(window, title) |
|||
self.setMaximumWidth(540) |
|||
|
|||
devmgr = plugin.device_manager() |
|||
config = devmgr.config |
|||
handler = keystore.handler |
|||
self.thread = thread = keystore.thread |
|||
|
|||
def connect_and_doit(): |
|||
client = devmgr.client_by_id(device_id) |
|||
if not client: |
|||
raise RuntimeError("Device not connected") |
|||
return client |
|||
|
|||
body = QWidget() |
|||
body_layout = QVBoxLayout(body) |
|||
grid = QGridLayout() |
|||
grid.setColumnStretch(2, 1) |
|||
|
|||
# see <http://doc.qt.io/archives/qt-4.8/richtext-html-subset.html> |
|||
title = QLabel('''<center> |
|||
<span style="font-size: x-large">Coldcard Wallet</span> |
|||
<br><span style="font-size: medium">from Coinkite Inc.</span> |
|||
<br><a href="https://coldcardwallet.com">coldcardwallet.com</a>''') |
|||
title.setTextInteractionFlags(Qt.LinksAccessibleByMouse) |
|||
|
|||
grid.addWidget(title , 0,0, 1,2, Qt.AlignHCenter) |
|||
y = 3 |
|||
|
|||
rows = [ |
|||
('fw_version', _("Firmware Version")), |
|||
('fw_built', _("Build Date")), |
|||
('bl_version', _("Bootloader")), |
|||
('xfp', _("Master Fingerprint")), |
|||
('serial', _("USB Serial")), |
|||
] |
|||
for row_num, (member_name, label) in enumerate(rows): |
|||
widget = QLabel('<tt>000000000000') |
|||
widget.setTextInteractionFlags(Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard) |
|||
|
|||
grid.addWidget(QLabel(label), y, 0, 1,1, Qt.AlignRight) |
|||
grid.addWidget(widget, y, 1, 1, 1, Qt.AlignLeft) |
|||
setattr(self, member_name, widget) |
|||
y += 1 |
|||
body_layout.addLayout(grid) |
|||
|
|||
upg_btn = QPushButton('Upgrade') |
|||
#upg_btn.setDefault(False) |
|||
def _start_upgrade(): |
|||
thread.add(connect_and_doit, on_success=self.start_upgrade) |
|||
upg_btn.clicked.connect(_start_upgrade) |
|||
|
|||
y += 3 |
|||
grid.addWidget(upg_btn, y, 0) |
|||
grid.addWidget(CloseButton(self), y, 1) |
|||
|
|||
dialog_vbox = QVBoxLayout(self) |
|||
dialog_vbox.addWidget(body) |
|||
|
|||
# Fetch values and show them |
|||
thread.add(connect_and_doit, on_success=self.show_values) |
|||
|
|||
def show_values(self, client): |
|||
dev = client.dev |
|||
|
|||
self.xfp.setText('<tt>0x%08x' % dev.master_fingerprint) |
|||
self.serial.setText('<tt>%s' % dev.serial) |
|||
|
|||
# ask device for versions: allow extras for future |
|||
fw_date, fw_rel, bl_rel, *rfu = client.get_version() |
|||
|
|||
self.fw_version.setText('<tt>%s' % fw_rel) |
|||
self.fw_built.setText('<tt>%s' % fw_date) |
|||
self.bl_version.setText('<tt>%s' % bl_rel) |
|||
|
|||
def start_upgrade(self, client): |
|||
# ask for a filename (must have already downloaded it) |
|||
mw = get_parent_main_window(self) |
|||
dev = client.dev |
|||
|
|||
fileName = mw.getOpenFileName("Select upgraded firmware file", "*.dfu") |
|||
if not fileName: |
|||
return |
|||
|
|||
from ckcc.utils import dfu_parse |
|||
from ckcc.sigheader import FW_HEADER_SIZE, FW_HEADER_OFFSET, FW_HEADER_MAGIC |
|||
from ckcc.protocol import CCProtocolPacker |
|||
from hashlib import sha256 |
|||
import struct |
|||
|
|||
try: |
|||
with open(fileName, 'rb') as fd: |
|||
|
|||
# unwrap firmware from the DFU |
|||
offset, size, *ignored = dfu_parse(fd) |
|||
|
|||
fd.seek(offset) |
|||
firmware = fd.read(size) |
|||
|
|||
hpos = FW_HEADER_OFFSET |
|||
hdr = bytes(firmware[hpos:hpos + FW_HEADER_SIZE]) # needed later too |
|||
magic = struct.unpack_from("<I", hdr)[0] |
|||
|
|||
if magic != FW_HEADER_MAGIC: |
|||
raise ValueError("Bad magic") |
|||
except Exception as exc: |
|||
mw.show_error("Does not appear to be a Coldcard firmware file.\n\n%s" % exc) |
|||
return |
|||
|
|||
# TODO: |
|||
# - detect if they are trying to downgrade; aint gonna work |
|||
# - warn them about the reboot? |
|||
# - length checks |
|||
# - add progress local bar |
|||
mw.show_message("Ready to Upgrade.\n\nBe patient. Unit will reboot itself when complete.") |
|||
|
|||
def doit(): |
|||
dlen, _ = dev.upload_file(firmware, verify=True) |
|||
assert dlen == len(firmware) |
|||
|
|||
# append the firmware header a second time |
|||
result = dev.send_recv(CCProtocolPacker.upload(size, size+FW_HEADER_SIZE, hdr)) |
|||
|
|||
# make it reboot into bootlaoder which might install it |
|||
dev.send_recv(CCProtocolPacker.reboot()) |
|||
|
|||
self.thread.add(doit) |
|||
self.close() |
|||
# EOF |
After Width: | Height: | Size: 528 B |
After Width: | Height: | Size: 788 B |
Loading…
Reference in new issue