Browse Source

Add multisig support for Coldcard plugin

-----

Squashed commit of the following:

commit 69c0d48108314db6f0e100bea2ce5a9a3a0e9a1f
Author: Peter D. Gray <peter@conalgo.com>
Date:   Fri Aug 2 14:51:33 2019 -0400

    deterministic-build/requirements-hw.txt: update to version 0.7.9 of ckcc-protocol for Coldcard

commit 5cd2c528698dfb4ad248844be3c741c25aa33e38
Merge: 5e2a36a3e 537b35586
Author: Peter D. Gray <peter@conalgo.com>
Date:   Fri Aug 2 14:41:59 2019 -0400

    Merge branch 'multisig' of github.com:Coldcard/electrum into multisig

commit 5e2a36a3ee28780a11f789f69896e6e795621bfc
Author: Peter D. Gray <peter@conalgo.com>
Date:   Fri Aug 2 14:41:49 2019 -0400

    Some fixes for p2wsh-p2sh and p2wsh cases

commit 537b35586e0b1e11622a8e7d718b6fd37d47f952
Merge: a9e3ca47e 2a80f6a3a
Author: nvk <rodolfo@rnvk.org>
Date:   Tue Jul 23 11:40:39 2019 -0400

    Merge branch 'master' into multisig

commit a9e3ca47e189bcf0556703a4f2ca0c084638eb73
Author: Peter D. Gray <peter@conalgo.com>
Date:   Mon Jun 24 13:36:41 2019 -0400

    Bugfix: not all keystores have labels

commit 57783ec158af5ca8d63d596638bc3b6ee63b053f
Author: Peter D. Gray <peter@conalgo.com>
Date:   Mon Jun 24 13:36:04 2019 -0400

    Add address format to export data, and bugfix: use xfp_for_keystore()

commit 6f1f7673eaa340d14497b11c2453f03a73b38850
Author: Peter D. Gray <peter@conalgo.com>
Date:   Fri Jun 21 09:06:49 2019 -0400

    Revert "bugfix: P2SH inputs can be signed with extra signatures, more than required"

    This reverts commit 75b6b663eca9e7b5edc9a463f7acd8f1c0f0a61a.

commit c322fb6dd2783e1103f5bf69ce60a365fbaf4bfe
Author: Peter D. Gray <peter@conalgo.com>
Date:   Thu Jun 20 12:57:19 2019 -0400

    Require latest CKCC protocol

commit 69a5b781ebc182851d2e25319b549ec58ea23eb1
Author: Peter D. Gray <peter@conalgo.com>
Date:   Thu Jun 20 12:40:27 2019 -0400

    gui/qt/main_window.py: add co-signer keystore label to wallet info display, and a hook for different buttons

commit 55d506d264dbb341602630c3429134e493995272
Author: Peter D. Gray <peter@conalgo.com>
Date:   Thu Jun 20 12:36:10 2019 -0400

    PSBT Combining/cleanup

commit 75b6b663eca9e7b5edc9a463f7acd8f1c0f0a61a
Author: Peter D. Gray <peter@conalgo.com>
Date:   Thu Jun 20 10:18:02 2019 -0400

    bugfix: P2SH inputs can be signed with extra signatures, more than required

commit 1bde362ddbbfd86520a7cb7bc51e0bcef06be078
Author: Peter D. Gray <peter@conalgo.com>
Date:   Wed Jun 19 09:47:26 2019 -0400

    Combines signed PSBT files

commit cc5c0532e52fbe282e862e20c250cc88ed435cad
Author: Peter D. Gray <peter@conalgo.com>
Date:   Fri Jun 14 13:04:32 2019 -0400

    Working towards multisig

commit cb20da5428ba97237006683133e10b0758999966
Author: Peter D. Gray <peter@conalgo.com>
Date:   Fri Jun 14 13:04:18 2019 -0400

    Refactor/import PSBT handling code into own files

commit 558ef82bb0a8c16fb4e8bd0a6a80190498f1ce57
Author: Peter D. Gray <peter@conalgo.com>
Date:   Tue May 28 13:26:10 2019 -0400

    plugins/hw_wallet/qt.py: show keystore label in tooltip

commit 269299df4a9eb5960b6c6ec0afcbf3ef69ad0be3
Author: Peter D. Gray <peter@conalgo.com>
Date:   Mon May 27 09:32:43 2019 -0400

    Swap endian of xpub fingprint values, so they are shown as BE32 in capitalized hex, rather than 0x%08x (LE32)
dependabot/pip/contrib/deterministic-build/ecdsa-0.13.3
Peter D. Gray 6 years ago
committed by SomberNight
parent
commit
4baab751a4
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 2
      contrib/requirements/requirements-hw.txt
  2. 22
      electrum/gui/qt/main_window.py
  3. 312
      electrum/plugins/coldcard/basic_psbt.py
  4. 391
      electrum/plugins/coldcard/build_psbt.py
  5. 361
      electrum/plugins/coldcard/coldcard.py
  6. 210
      electrum/plugins/coldcard/qt.py
  7. 2
      electrum/plugins/hw_wallet/qt.py

2
contrib/requirements/requirements-hw.txt

@ -2,5 +2,5 @@ trezor[hidapi]>=0.11.0
safet[hidapi]>=0.1.0 safet[hidapi]>=0.1.0
keepkey>=6.0.3 keepkey>=6.0.3
btchip-python>=0.1.26 btchip-python>=0.1.26
ckcc-protocol>=0.7.2 ckcc-protocol>=0.7.7
hidapi hidapi

22
electrum/gui/qt/main_window.py

@ -2390,29 +2390,39 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
ks_type = str(keystore_types[0]) if keystore_types else _('No keystore') ks_type = str(keystore_types[0]) if keystore_types else _('No keystore')
grid.addWidget(QLabel(ks_type), 4, 1) grid.addWidget(QLabel(ks_type), 4, 1)
vbox.addLayout(grid) vbox.addLayout(grid)
if self.wallet.is_deterministic(): if self.wallet.is_deterministic():
mpk_text = ShowQRTextEdit() mpk_text = ShowQRTextEdit()
mpk_text.setMaximumHeight(150) mpk_text.setMaximumHeight(150)
mpk_text.addCopyButton(self.app) mpk_text.addCopyButton(self.app)
def show_mpk(index): def show_mpk(index):
mpk_text.setText(mpk_list[index]) mpk_text.setText(mpk_list[index])
mpk_text.repaint() # macOS hack for #4777 mpk_text.repaint() # macOS hack for #4777
# only show the combobox in case multiple accounts are available # only show the combobox in case multiple accounts are available
if len(mpk_list) > 1: if len(mpk_list) > 1:
def label(key): # only show the combobox if multiple master keys are defined
if isinstance(self.wallet, Multisig_Wallet): def label(idx, ks):
return _("cosigner") + f' {key+1} ( keystore: {keystore_types[key]} )' if isinstance(self.wallet, Multisig_Wallet) and hasattr(ks, 'label'):
return '' return _("cosigner") + f' {idx+1}: {ks.get_type_text()} {ks.label}'
labels = [label(i) for i in range(len(mpk_list))] else:
return _("keystore") + f' {idx+1}'
labels = [label(idx, ks) for idx, ks in enumerate(self.wallet.get_keystores())]
on_click = lambda clayout: show_mpk(clayout.selected_index()) on_click = lambda clayout: show_mpk(clayout.selected_index())
labels_clayout = ChoicesLayout(_("Master Public Keys"), labels, on_click) labels_clayout = ChoicesLayout(_("Master Public Keys"), labels, on_click)
vbox.addLayout(labels_clayout.layout()) vbox.addLayout(labels_clayout.layout())
else: else:
vbox.addWidget(QLabel(_("Master Public Key"))) vbox.addWidget(QLabel(_("Master Public Key")))
show_mpk(0) show_mpk(0)
vbox.addWidget(mpk_text) vbox.addWidget(mpk_text)
vbox.addStretch(1) vbox.addStretch(1)
vbox.addLayout(Buttons(CloseButton(dialog))) btns = run_hook('wallet_info_buttons', self, dialog) or Buttons(CloseButton(dialog))
vbox.addLayout(btns)
dialog.setLayout(vbox) dialog.setLayout(vbox)
dialog.exec_() dialog.exec_()

312
electrum/plugins/coldcard/basic_psbt.py

@ -0,0 +1,312 @@
#
# basic_psbt.py - yet another PSBT parser/serializer but used only for test cases.
#
# - history: taken from coldcard-firmware/testing/psbt.py
# - trying to minimize electrum code in here, and generally, dependancies.
#
import io, struct
from base64 import b64decode
from binascii import a2b_hex, b2a_hex
from struct import pack, unpack
from electrum.transaction import Transaction
# BIP-174 (aka PSBT) defined values
#
PSBT_GLOBAL_UNSIGNED_TX = (0)
PSBT_GLOBAL_XPUB = (1)
PSBT_IN_NON_WITNESS_UTXO = (0)
PSBT_IN_WITNESS_UTXO = (1)
PSBT_IN_PARTIAL_SIG = (2)
PSBT_IN_SIGHASH_TYPE = (3)
PSBT_IN_REDEEM_SCRIPT = (4)
PSBT_IN_WITNESS_SCRIPT = (5)
PSBT_IN_BIP32_DERIVATION = (6)
PSBT_IN_FINAL_SCRIPTSIG = (7)
PSBT_IN_FINAL_SCRIPTWITNESS = (8)
PSBT_OUT_REDEEM_SCRIPT = (0)
PSBT_OUT_WITNESS_SCRIPT = (1)
PSBT_OUT_BIP32_DERIVATION = (2)
# Serialization/deserialization tools
def ser_compact_size(l):
r = b""
if l < 253:
r = struct.pack("B", l)
elif l < 0x10000:
r = struct.pack("<BH", 253, l)
elif l < 0x100000000:
r = struct.pack("<BI", 254, l)
else:
r = struct.pack("<BQ", 255, l)
return r
def deser_compact_size(f):
try:
nit = f.read(1)[0]
except IndexError:
return None # end of file
if nit == 253:
nit = struct.unpack("<H", f.read(2))[0]
elif nit == 254:
nit = struct.unpack("<I", f.read(4))[0]
elif nit == 255:
nit = struct.unpack("<Q", f.read(8))[0]
return nit
def my_var_int(l):
# Bitcoin serialization of integers... directly into binary!
if l < 253:
return pack("B", l)
elif l < 0x10000:
return pack("<BH", 253, l)
elif l < 0x100000000:
return pack("<BI", 254, l)
else:
return pack("<BQ", 255, l)
class PSBTSection:
def __init__(self, fd=None, idx=None):
self.defaults()
self.my_index = idx
if not fd: return
while 1:
ks = deser_compact_size(fd)
if ks is None: break
if ks == 0: break
key = fd.read(ks)
vs = deser_compact_size(fd)
val = fd.read(vs)
kt = key[0]
self.parse_kv(kt, key[1:], val)
def serialize(self, fd, my_idx):
def wr(ktype, val, key=b''):
fd.write(ser_compact_size(1 + len(key)))
fd.write(bytes([ktype]) + key)
fd.write(ser_compact_size(len(val)))
fd.write(val)
self.serialize_kvs(wr)
fd.write(b'\0')
class BasicPSBTInput(PSBTSection):
def defaults(self):
self.utxo = None
self.witness_utxo = None
self.part_sigs = {}
self.sighash = None
self.bip32_paths = {}
self.redeem_script = None
self.witness_script = None
self.others = {}
def __eq__(a, b):
if a.sighash != b.sighash:
if a.sighash is not None and b.sighash is not None:
return False
rv = a.utxo == b.utxo and \
a.witness_utxo == b.witness_utxo and \
a.redeem_script == b.redeem_script and \
a.witness_script == b.witness_script and \
a.my_index == b.my_index and \
a.bip32_paths == b.bip32_paths and \
sorted(a.part_sigs.keys()) == sorted(b.part_sigs.keys())
# NOTE: equality test on signatures requires parsing DER stupidness
# and some maybe understanding of R/S values on curve that I don't have.
return rv
def parse_kv(self, kt, key, val):
if kt == PSBT_IN_NON_WITNESS_UTXO:
self.utxo = val
assert not key
elif kt == PSBT_IN_WITNESS_UTXO:
self.witness_utxo = val
assert not key
elif kt == PSBT_IN_PARTIAL_SIG:
self.part_sigs[key] = val
elif kt == PSBT_IN_SIGHASH_TYPE:
assert len(val) == 4
self.sighash = struct.unpack("<I", val)[0]
assert not key
elif kt == PSBT_IN_BIP32_DERIVATION:
self.bip32_paths[key] = val
elif kt == PSBT_IN_REDEEM_SCRIPT:
self.redeem_script = val
assert not key
elif kt == PSBT_IN_WITNESS_SCRIPT:
self.witness_script = val
assert not key
elif kt in ( PSBT_IN_REDEEM_SCRIPT,
PSBT_IN_WITNESS_SCRIPT,
PSBT_IN_FINAL_SCRIPTSIG,
PSBT_IN_FINAL_SCRIPTWITNESS):
assert not key
self.others[kt] = val
else:
raise KeyError(kt)
def serialize_kvs(self, wr):
if self.utxo:
wr(PSBT_IN_NON_WITNESS_UTXO, self.utxo)
if self.witness_utxo:
wr(PSBT_IN_WITNESS_UTXO, self.witness_utxo)
if self.redeem_script:
wr(PSBT_IN_REDEEM_SCRIPT, self.redeem_script)
if self.witness_script:
wr(PSBT_IN_WITNESS_SCRIPT, self.witness_script)
for pk, val in sorted(self.part_sigs.items()):
wr(PSBT_IN_PARTIAL_SIG, val, pk)
if self.sighash is not None:
wr(PSBT_IN_SIGHASH_TYPE, struct.pack('<I', self.sighash))
for k in self.bip32_paths:
wr(PSBT_IN_BIP32_DERIVATION, self.bip32_paths[k], k)
for k in self.others:
wr(k, self.others[k])
class BasicPSBTOutput(PSBTSection):
def defaults(self):
self.redeem_script = None
self.witness_script = None
self.bip32_paths = {}
def __eq__(a, b):
return a.redeem_script == b.redeem_script and \
a.witness_script == b.witness_script and \
a.my_index == b.my_index and \
a.bip32_paths == b.bip32_paths
def parse_kv(self, kt, key, val):
if kt == PSBT_OUT_REDEEM_SCRIPT:
self.redeem_script = val
assert not key
elif kt == PSBT_OUT_WITNESS_SCRIPT:
self.witness_script = val
assert not key
elif kt == PSBT_OUT_BIP32_DERIVATION:
self.bip32_paths[key] = val
else:
raise ValueError(kt)
def serialize_kvs(self, wr):
if self.redeem_script:
wr(PSBT_OUT_REDEEM_SCRIPT, self.redeem_script)
if self.witness_script:
wr(PSBT_OUT_WITNESS_SCRIPT, self.witness_script)
for k in self.bip32_paths:
wr(PSBT_OUT_BIP32_DERIVATION, self.bip32_paths[k], k)
class BasicPSBT:
"Just? parse and store"
def __init__(self):
self.txn = None
self.filename = None
self.parsed_txn = None
self.xpubs = []
self.inputs = []
self.outputs = []
def __eq__(a, b):
return a.txn == b.txn and \
len(a.inputs) == len(b.inputs) and \
len(a.outputs) == len(b.outputs) and \
all(a.inputs[i] == b.inputs[i] for i in range(len(a.inputs))) and \
all(a.outputs[i] == b.outputs[i] for i in range(len(a.outputs))) and \
sorted(a.xpubs) == sorted(b.xpubs)
def parse(self, raw, filename=None):
# auto-detect and decode Base64 and Hex.
if raw[0:10].lower() == b'70736274ff':
raw = a2b_hex(raw.strip())
if raw[0:6] == b'cHNidP':
raw = b64decode(raw)
assert raw[0:5] == b'psbt\xff', "bad magic"
self.filename = filename
with io.BytesIO(raw[5:]) as fd:
# globals
while 1:
ks = deser_compact_size(fd)
if ks is None: break
if ks == 0: break
key = fd.read(ks)
vs = deser_compact_size(fd)
val = fd.read(vs)
kt = key[0]
if kt == PSBT_GLOBAL_UNSIGNED_TX:
self.txn = val
self.parsed_txn = Transaction(val.hex())
num_ins = len(self.parsed_txn.inputs())
num_outs = len(self.parsed_txn.outputs())
elif kt == PSBT_GLOBAL_XPUB:
# key=(xpub) => val=(path)
self.xpubs.append( (key, val) )
else:
raise ValueError('unknown global key type: 0x%02x' % kt)
assert self.txn, 'missing reqd section'
self.inputs = [BasicPSBTInput(fd, idx) for idx in range(num_ins)]
self.outputs = [BasicPSBTOutput(fd, idx) for idx in range(num_outs)]
sep = fd.read(1)
assert sep == b''
return self
def serialize(self, fd):
def wr(ktype, val, key=b''):
fd.write(ser_compact_size(1 + len(key)))
fd.write(bytes([ktype]) + key)
fd.write(ser_compact_size(len(val)))
fd.write(val)
fd.write(b'psbt\xff')
wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn)
for k,v in self.xpubs:
wr(PSBT_GLOBAL_XPUB, v, key=k)
# sep
fd.write(b'\0')
for idx, inp in enumerate(self.inputs):
inp.serialize(fd, idx)
for idx, outp in enumerate(self.outputs):
outp.serialize(fd, idx)
def as_bytes(self):
with io.BytesIO() as fd:
self.serialize(fd)
return fd.getvalue()
# EOF

391
electrum/plugins/coldcard/build_psbt.py

@ -0,0 +1,391 @@
#
# build_psbt.py - create a PSBT from (unsigned) transaction and keystore data.
#
import io, struct
from base64 import b64decode
from binascii import a2b_hex, b2a_hex
from struct import pack, unpack
from electrum.transaction import (Transaction, multisig_script, parse_redeemScript_multisig,
NotRecognizedRedeemScript)
from electrum.logging import get_logger
from electrum.wallet import Standard_Wallet, Multisig_Wallet, Wallet
from electrum.keystore import xpubkey_to_pubkey, Xpub
from electrum.util import bfh, bh2u
from electrum.crypto import hash_160
from electrum.bitcoin import DecodeBase58Check
from .basic_psbt import (
PSBT_GLOBAL_UNSIGNED_TX, PSBT_GLOBAL_XPUB, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO,
PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, PSBT_IN_PARTIAL_SIG,
PSBT_IN_BIP32_DERIVATION, PSBT_OUT_BIP32_DERIVATION, PSBT_OUT_REDEEM_SCRIPT)
from .basic_psbt import BasicPSBT
from electrum.logging import get_logger
from electrum.wallet import Standard_Wallet, Multisig_Wallet, Wallet
from electrum.util import bfh, bh2u
from electrum.crypto import hash_160
from electrum.bitcoin import DecodeBase58Check
_logger = get_logger(__name__)
def xfp2str(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return b2a_hex(pack('<I', xfp)).decode('ascii').upper()
def xfp_from_xpub(xpub):
# sometime we need to BIP32 fingerprint value: 4 bytes of ripemd(sha256(pubkey))
kk = bfh(Xpub.get_pubkey_from_xpub(xpub, []))
assert len(kk) == 33
xfp, = unpack('<I', hash_160(kk)[0:4])
return xfp
def packed_xfp_path(xfp, text_path, int_path=[]):
# Convert text subkey derivation path into binary format needed for PSBT
# - binary LE32 values, first one is the fingerprint
rv = pack('<I', xfp)
for x in text_path.split('/'):
if x == 'm': continue
if x.endswith("'"):
x = int(x[:-1]) | 0x80000000
else:
x = int(x)
rv += pack('<I', x)
for x in int_path:
rv += pack('<I', x)
return rv
def unpacked_xfp_path(xfp, text_path):
# Convert text subkey derivation path into format needed for PSBT
# - binary LE32 values, first one is the fingerprint
# - but as ints, not bytes yet
rv = [xfp]
for x in text_path.split('/'):
if x == 'm': continue
if x.endswith("'"):
x = int(x[:-1]) | 0x80000000
else:
x = int(x)
rv.append(x)
return rv
def xfp_for_keystore(ks):
# Need the fingerprint of the MASTER key for a keystore we're playing with.
xfp = getattr(ks, 'ckcc_xfp', None)
if xfp is None:
xfp = xfp_from_xpub(ks.get_master_public_key())
setattr(ks, 'ckcc_xfp', xfp)
return xfp
# Serialization/deserialization tools
def ser_compact_size(l):
r = b""
if l < 253:
r = struct.pack("B", l)
elif l < 0x10000:
r = struct.pack("<BH", 253, l)
elif l < 0x100000000:
r = struct.pack("<BI", 254, l)
else:
r = struct.pack("<BQ", 255, l)
return r
def deser_compact_size(f):
try:
nit = f.read(1)[0]
except IndexError:
return None # end of file
if nit == 253:
nit = struct.unpack("<H", f.read(2))[0]
elif nit == 254:
nit = struct.unpack("<I", f.read(4))[0]
elif nit == 255:
nit = struct.unpack("<Q", f.read(8))[0]
return nit
def my_var_int(l):
# Bitcoin serialization of integers... directly into binary!
if l < 253:
return pack("B", l)
elif l < 0x10000:
return pack("<BH", 253, l)
elif l < 0x100000000:
return pack("<BI", 254, l)
else:
return pack("<BQ", 255, l)
def build_psbt(tx: Transaction, wallet: Wallet):
# Render a PSBT file, for possible upload to Coldcard.
#
# TODO this should be part of Wallet object, or maybe Transaction?
if getattr(tx, 'raw_psbt', False):
_logger.info('PSBT cache hit')
return tx.raw_psbt
inputs = tx.inputs()
if 'prev_tx' not in inputs[0]:
# fetch info about inputs, if needed?
# - needed during export PSBT flow, not normal online signing
wallet.add_hw_info(tx)
# wallet.add_hw_info installs this attr
assert tx.output_info is not None, 'need data about outputs'
# Build a map of all pubkeys needed as derivation from master XFP, in PSBT binary format
# 1) binary version of the common subpath for all keys
# m/ => fingerprint LE32
# a/b/c => ints
#
# 2) all used keys in transaction:
# - for all inputs and outputs (when its change back)
# - for all keystores, if multisig
#
subkeys = {}
for ks in wallet.get_keystores():
# XFP + fixed prefix for this keystore
ks_prefix = packed_xfp_path(xfp_for_keystore(ks), ks.get_derivation()[2:])
# all pubkeys needed for input signing
for xpubkey, derivation in ks.get_tx_derivations(tx).items():
pubkey = xpubkey_to_pubkey(xpubkey)
# assuming depth two, non-harded: change + index
aa, bb = derivation
assert 0 <= aa < 0x80000000 and 0 <= bb < 0x80000000
subkeys[bfh(pubkey)] = ks_prefix + pack('<II', aa, bb)
# all keys related to change outputs
for o in tx.outputs():
if o.address in tx.output_info:
# this address "is_mine" but might not be change (if I send funds to myself)
chg_path = tx.output_info.get(o.address).address_index
if chg_path[0] != 1 or len(chg_path) != 2:
# not change.
continue
pubkey = ks.derive_pubkey(True, chg_path[1])
subkeys[bfh(pubkey)] = ks_prefix + pack('<II', *chg_path)
for txin in inputs:
assert txin['type'] != 'coinbase', _("Coinbase not supported")
if txin['type'] in ['p2sh', 'p2wsh-p2sh', 'p2wsh']:
assert type(wallet) is Multisig_Wallet
# Construct PSBT from start to finish.
out_fd = io.BytesIO()
out_fd.write(b'psbt\xff')
def write_kv(ktype, val, key=b''):
# serialize helper: write w/ size and key byte
out_fd.write(my_var_int(1 + len(key)))
out_fd.write(bytes([ktype]) + key)
if isinstance(val, str):
val = bfh(val)
out_fd.write(my_var_int(len(val)))
out_fd.write(val)
# global section: just the unsigned txn
class CustomTXSerialization(Transaction):
@classmethod
def input_script(cls, txin, estimate_size=False):
return ''
unsigned = bfh(CustomTXSerialization(tx.serialize()).serialize_to_network(witness=False))
write_kv(PSBT_GLOBAL_UNSIGNED_TX, unsigned)
if type(wallet) is Multisig_Wallet:
# always put the xpubs into the PSBT, useful at least for checking
for xp, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()):
xfp = xfp_for_keystore(ks)
dd = getattr(ks, 'derivation', 'm')
write_kv(PSBT_GLOBAL_XPUB, packed_xfp_path(xfp, dd), DecodeBase58Check(xp))
# end globals section
out_fd.write(b'\x00')
# inputs section
for txin in inputs:
if Transaction.is_segwit_input(txin):
utxo = txin['prev_tx'].outputs()[txin['prevout_n']]
spendable = txin['prev_tx'].serialize_output(utxo)
write_kv(PSBT_IN_WITNESS_UTXO, spendable)
else:
write_kv(PSBT_IN_NON_WITNESS_UTXO, str(txin['prev_tx']))
pubkeys, x_pubkeys = tx.get_sorted_pubkeys(txin)
pubkeys = [bfh(k) for k in pubkeys]
if type(wallet) is Multisig_Wallet:
# always need a redeem script for multisig
scr = Transaction.get_preimage_script(txin)
if 'p2wsh' in txin['type']:
# needed for both p2wsh-p2sh and p2wsh
write_kv(PSBT_IN_WITNESS_SCRIPT, bfh(scr))
else:
write_kv(PSBT_IN_REDEEM_SCRIPT, bfh(scr))
sigs = txin.get('signatures')
for pk_pos, (pubkey, x_pubkey) in enumerate(zip(pubkeys, x_pubkeys)):
if pubkey in subkeys:
# faster? case ... calculated above
write_kv(PSBT_IN_BIP32_DERIVATION, subkeys[pubkey], pubkey)
else:
# when an input is partly signed, tx.get_tx_derivations()
# doesn't include that keystore's value and yet we need it
# because we need to show a correct keypath...
assert x_pubkey[0:2] == 'ff', x_pubkey
for ks in wallet.get_keystores():
d = ks.get_pubkey_derivation(x_pubkey)
if d is not None:
ks_path = packed_xfp_path(xfp_for_keystore(ks), ks.get_derivation()[2:], d)
write_kv(PSBT_IN_BIP32_DERIVATION, ks_path, pubkey)
break
else:
raise AssertionError("no keystore for: %s" % x_pubkey)
if txin['type'] == 'p2wpkh-p2sh':
assert len(pubkeys) == 1, 'can be only one redeem script per input'
pa = hash_160(k)
assert len(pa) == 20
write_kv(PSBT_IN_REDEEM_SCRIPT, b'\x00\x14'+pa)
# optional? insert (partial) signatures that we already have
if sigs and sigs[pk_pos]:
write_kv(PSBT_IN_PARTIAL_SIG, bfh(sigs[pk_pos]), pubkey)
out_fd.write(b'\x00')
# outputs section
for o in tx.outputs():
# can be empty, but must be present, and helpful to show change inputs
# wallet.add_hw_info() adds some data about change outputs into tx.output_info
if o.address in tx.output_info:
# this address "is_mine" but might not be change (if I send funds to myself)
output_info = tx.output_info.get(o.address)
chg_path, master_xpubs = output_info.address_index, output_info.sorted_xpubs
if chg_path[0] == 1 and len(chg_path) == 2:
# it is a change output (based on our standard derivation path)
pubkeys = [bfh(i) for i in wallet.get_public_keys(o.address)]
# always need a redeem script for multisig
if type(wallet) is Multisig_Wallet:
scr = multisig_script([bh2u(i) for i in sorted(pubkeys)], wallet.m)
write_kv(PSBT_OUT_REDEEM_SCRIPT, bfh(scr))
# document change output's bip32 derivation(s)
for pubkey in pubkeys:
sk = subkeys[pubkey]
write_kv(PSBT_OUT_BIP32_DERIVATION, sk, pubkey)
if output_info.script_type == 'p2wpkh-p2sh':
assert len(pa) == 20
assert len(pubkeys) == 1
pa = hash_160(pubkey)
write_kv(PSBT_OUT_REDEEM_SCRIPT, b'\x00\x14' + pa)
out_fd.write(b'\x00')
# capture for later use
tx.raw_psbt = out_fd.getvalue()
return tx.raw_psbt
def recover_tx_from_psbt(first: BasicPSBT, wallet: Wallet) -> Transaction:
# Take a PSBT object and re-construct the Electrum transaction object.
# - does not include signatures, see merge_sigs_from_psbt
# - any PSBT in the group could be used for this purpose; all must share tx details
tx = Transaction(first.txn.hex())
tx.deserialize(force_full_parse=True)
# .. add back some data that's been preserved in the PSBT, but isn't part of
# of the unsigned bitcoin txn
tx.is_partial_originally = True
for idx, inp in enumerate(tx.inputs()):
scr = first.inputs[idx].redeem_script or first.inputs[idx].witness_script
# XXX should use transaction.py parse_scriptSig() here!
if scr:
try:
M, N, __, pubkeys, __ = parse_redeemScript_multisig(scr)
except NotRecognizedRedeemScript:
# limitation: we can only handle M-of-N multisig here
raise ValueError("Cannot handle non M-of-N multisig input")
inp['pubkeys'] = pubkeys
inp['x_pubkeys'] = pubkeys
inp['num_sig'] = M
inp['type'] = 'p2wsh' if first.inputs[idx].witness_script else 'p2sh'
# bugfix: transaction.py:parse_input() puts empty dict here, but need a list
inp['signatures'] = [None] * N
if 'prev_tx' not in inp:
# fetch info about inputs' previous txn
wallet.add_hw_info(tx)
if 'value' not in inp:
# we'll need to know the value of the outpts used as part
# of the witness data, much later...
inp['value'] = inp['prev_tx'].outputs()[inp['prevout_n']].value
return tx
def merge_sigs_from_psbt(tx: Transaction, psbt: BasicPSBT):
# Take new signatures from PSBT, and merge into in-memory transaction object.
# - "we trust everyone here" ... no validation/checks
count = 0
for inp_idx, inp in enumerate(psbt.inputs):
if not inp.part_sigs:
continue
scr = inp.redeem_script or inp.witness_script
# need to map from pubkey to signing position in redeem script
M, N, _, pubkeys, _ = parse_redeemScript_multisig(scr)
#assert (M, N) == (wallet.m, wallet.n)
for sig_pk in inp.part_sigs:
pk_pos = pubkeys.index(sig_pk.hex())
tx.add_signature_to_txin(inp_idx, pk_pos, inp.part_sigs[sig_pk].hex())
count += 1
#print("#%d: sigs = %r" % (inp_idx, tx.inputs()[inp_idx]['signatures']))
# reset serialization of TX
tx.raw = tx.serialize()
tx.raw_psbt = None
return count
# EOF

361
electrum/plugins/coldcard/coldcard.py

@ -8,18 +8,22 @@ import traceback
from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes
from electrum.i18n import _ from electrum.i18n import _
from electrum.plugin import Device from electrum.plugin import Device, hook
from electrum.keystore import Hardware_KeyStore, xpubkey_to_pubkey, Xpub from electrum.keystore import Hardware_KeyStore
from electrum.transaction import Transaction from electrum.transaction import Transaction, multisig_script
from electrum.wallet import Standard_Wallet from electrum.wallet import Standard_Wallet, Multisig_Wallet, Wallet
from electrum.crypto import hash_160 from electrum.crypto import hash_160
from electrum.util import bfh, bh2u, versiontuple, UserFacingException from electrum.util import bfh, bh2u, versiontuple, UserFacingException
from electrum.base_wizard import ScriptTypeNotSupported from electrum.base_wizard import ScriptTypeNotSupported
from electrum.logging import get_logger from electrum.logging import get_logger
from electrum.bitcoin import DecodeBase58Check
from ..hw_wallet import HW_PluginBase from ..hw_wallet import HW_PluginBase
from ..hw_wallet.plugin import LibraryFoundButUnusable from ..hw_wallet.plugin import LibraryFoundButUnusable, only_hook_if_libraries_available
from .basic_psbt import BasicPSBT
from .build_psbt import (build_psbt, xfp2str, unpacked_xfp_path,
merge_sigs_from_psbt, xfp_for_keystore)
_logger = get_logger(__name__) _logger = get_logger(__name__)
@ -30,10 +34,10 @@ try:
from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError
from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN, from ckcc.constants import (MAX_MSG_LEN, MAX_BLK_LEN, MSG_SIGNING_MAX_LENGTH, MAX_TXN_LEN,
AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH) AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH)
from ckcc.constants import ( #from ckcc.constants import (
PSBT_GLOBAL_UNSIGNED_TX, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO, #PSBT_GLOBAL_UNSIGNED_TX, PSBT_GLOBAL_XPUB, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO,
PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, #PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT,
PSBT_IN_BIP32_DERIVATION, PSBT_OUT_BIP32_DERIVATION, PSBT_OUT_REDEEM_SCRIPT) #PSBT_IN_BIP32_DERIVATION, PSBT_OUT_BIP32_DERIVATION, PSBT_OUT_REDEEM_SCRIPT)
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, CKCC_SIMULATOR_PATH
@ -60,26 +64,6 @@ except ImportError:
CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa CKCC_SIMULATED_PID = CKCC_PID ^ 0x55aa
def my_var_int(l):
# Bitcoin serialization of integers... directly into binary!
if l < 253:
return pack("B", l)
elif l < 0x10000:
return pack("<BH", 253, l)
elif l < 0x100000000:
return pack("<BI", 254, l)
else:
return pack("<BQ", 255, l)
def xfp_from_xpub(xpub):
# sometime we need to BIP32 fingerprint value: 4 bytes of ripemd(sha256(pubkey))
# UNTESTED
kk = bfh(Xpub.get_pubkey_from_xpub(xpub, []))
assert len(kk) == 33
xfp, = unpack('<I', hash_160(kk)[0:4])
return xfp
class CKCCClient: class CKCCClient:
# Challenge: I haven't found anywhere that defines a base class for this 'client', # Challenge: I haven't found anywhere that defines a base class for this 'client',
# nor an API (interface) to be met. Winging it. Gets called from lib/plugins.py mostly? # nor an API (interface) to be met. Winging it. Gets called from lib/plugins.py mostly?
@ -105,24 +89,27 @@ class CKCCClient:
# should expect. It's also kinda slow. # should expect. It's also kinda slow.
def __repr__(self): def __repr__(self):
return '<CKCCClient: xfp=%08x label=%r>' % (self.dev.master_fingerprint, return '<CKCCClient: xfp=%s label=%r>' % (xfp2str(self.dev.master_fingerprint),
self.label()) self.label())
def verify_connection(self, expected_xfp, expected_xpub): def verify_connection(self, expected_xfp, expected_xpub=None):
ex = (expected_xfp, expected_xpub) ex = (expected_xfp, expected_xpub)
if self._expected_device == ex: if self._expected_device == ex:
# all is as expected # all is as expected
return return
if expected_xpub is None:
expected_xpub = self.dev.master_xpub
if ( (self._expected_device is not None) if ( (self._expected_device is not None)
or (self.dev.master_fingerprint != expected_xfp) or (self.dev.master_fingerprint != expected_xfp)
or (self.dev.master_xpub != expected_xpub)): or (self.dev.master_xpub != expected_xpub)):
# probably indicating programing error, not hacking # probably indicating programing error, not hacking
_logger.info(f"xpubs. reported by device: {self.dev.master_xpub}. " _logger.info(f"xpubs. reported by device: {self.dev.master_xpub}. "
f"stored in file: {expected_xpub}") f"stored in file: {expected_xpub}")
raise RuntimeError("Expecting 0x%08x but that's not what's connected?!" % raise RuntimeError("Expecting %s but that's not what's connected?!" %
expected_xfp) xfp2str(expected_xfp))
# check signature over session key # check signature over session key
# - mitm might have lied about xfp and xpub up to here # - mitm might have lied about xfp and xpub up to here
@ -132,10 +119,13 @@ class CKCCClient:
self._expected_device = ex self._expected_device = ex
if not getattr(self, 'ckcc_xpub', None):
self.ckcc_xpub = expected_xpub
_logger.info("Successfully verified against MiTM") _logger.info("Successfully verified against MiTM")
def is_pairable(self): def is_pairable(self):
# can't do anything w/ devices that aren't setup (but not normally reachable) # can't do anything w/ devices that aren't setup (this code not normally reachable)
return bool(self.dev.master_xpub) return bool(self.dev.master_xpub)
def timeout(self, cutoff): def timeout(self, cutoff):
@ -155,12 +145,12 @@ class CKCCClient:
# not be encrypted, so better for privacy if based on xpub/fingerprint rather than # not be encrypted, so better for privacy if based on xpub/fingerprint rather than
# USB serial number. # USB serial number.
if self.dev.is_simulator: if self.dev.is_simulator:
lab = 'Coldcard Simulator 0x%08x' % self.dev.master_fingerprint lab = 'Coldcard Simulator ' + xfp2str(self.dev.master_fingerprint)
elif not self.dev.master_fingerprint: elif not self.dev.master_fingerprint:
# failback; not expected # failback; not expected
lab = 'Coldcard #' + self.dev.serial lab = 'Coldcard #' + self.dev.serial
else: else:
lab = 'Coldcard 0x%08x' % self.dev.master_fingerprint lab = 'Coldcard ' + xfp2str(self.dev.master_fingerprint)
# Hack zone: during initial setup I need the xfp and master xpub but # Hack zone: during initial setup I need the xfp and master xpub but
# very few objects are passed between the various steps of base_wizard. # very few objects are passed between the various steps of base_wizard.
@ -210,9 +200,13 @@ class CKCCClient:
raise RuntimeError("Communication trouble with Coldcard") raise RuntimeError("Communication trouble with Coldcard")
def show_address(self, path, addr_fmt): def show_address(self, path, addr_fmt):
# prompt user w/ addres, also returns it immediately. # prompt user w/ address, also returns it immediately.
return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None)
def show_p2sh_address(self, *args, **kws):
# prompt user w/ p2sh address, also returns it immediately.
return self.dev.send_recv(CCProtocolPacker.show_p2sh_address(*args, **kws), timeout=None)
def get_version(self): def get_version(self):
# gives list of strings # gives list of strings
return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n') return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n')
@ -262,22 +256,27 @@ class Coldcard_KeyStore(Hardware_KeyStore):
self.force_watching_only = False self.force_watching_only = False
self.ux_busy = False self.ux_busy = False
# for multisig I need to know what wallet this keystore is part of
# this is captured by hooling make_unsigned_transaction
self.my_wallet = None
# Seems like only the derivation path and resulting **derived** xpub is stored in # Seems like only the derivation path and resulting **derived** xpub is stored in
# the wallet file... however, we need to know at least the fingerprint of the master # the wallet file... however, we need to know at least the fingerprint of the master
# xpub to verify against MiTM, and also so we can put the right value into the subkey paths # xpub to verify against MiTM, and also so we can put the right value into the subkey paths
# of PSBT files that might be generated offline. # of PSBT files that might be generated offline.
# - save the fingerprint of the master xpub, as "xfp" # - save the fingerprint of the master xpub, as "xfp"
# - it's a LE32 int, but hex more natural way to see it # - it's a LE32 int, but hex BE32 is more natural way to view it
# - device reports these value during encryption setup process # - device reports these value during encryption setup process
# - full xpub value now optional
lab = d['label'] lab = d['label']
if hasattr(lab, 'xfp'): if hasattr(lab, 'xfp'):
# initial setup # initial setup
self.ckcc_xfp = lab.xfp self.ckcc_xfp = lab.xfp
self.ckcc_xpub = lab.xpub self.ckcc_xpub = getattr(lab, 'xpub', None)
else: else:
# wallet load: fatal if missing, we need them! # wallet load: fatal if missing, we need them!
self.ckcc_xfp = d['ckcc_xfp'] self.ckcc_xfp = d['ckcc_xfp']
self.ckcc_xpub = d['ckcc_xpub'] self.ckcc_xpub = d.get('ckcc_xpub', None)
def dump(self): def dump(self):
# our additions to the stored data about keystore -- only during creation? # our additions to the stored data about keystore -- only during creation?
@ -294,6 +293,7 @@ class Coldcard_KeyStore(Hardware_KeyStore):
def get_client(self): def get_client(self):
# called when user tries to do something like view address, sign somthing. # called when user tries to do something like view address, sign somthing.
# - not called during probing/setup # - not called during probing/setup
# - will fail if indicated device can't produce the xpub (at derivation) expected
rv = self.plugin.get_client(self) rv = self.plugin.get_client(self)
if rv: if rv:
rv.verify_connection(self.ckcc_xfp, self.ckcc_xpub) rv.verify_connection(self.ckcc_xfp, self.ckcc_xpub)
@ -377,162 +377,35 @@ class Coldcard_KeyStore(Hardware_KeyStore):
# give empty bytes for error cases; it seems to clear the old signature box # give empty bytes for error cases; it seems to clear the old signature box
return b'' return b''
def build_psbt(self, tx: Transaction, wallet=None, xfp=None):
# Render a PSBT file, for upload to Coldcard.
#
if xfp is None:
# need fingerprint of MASTER xpub, not the derived key
xfp = self.ckcc_xfp
inputs = tx.inputs()
if 'prev_tx' not in inputs[0]:
# fetch info about inputs, if needed?
# - needed during export PSBT flow, not normal online signing
assert wallet, 'need wallet reference'
wallet.add_hw_info(tx)
# wallet.add_hw_info installs this attr
assert tx.output_info is not None, 'need data about outputs'
# Build map of pubkey needed as derivation from master, in PSBT binary format
# 1) binary version of the common subpath for all keys
# m/ => fingerprint LE32
# a/b/c => ints
base_path = pack('<I', xfp)
for x in self.get_derivation()[2:].split('/'):
if x.endswith("'"):
x = int(x[:-1]) | 0x80000000
else:
x = int(x)
base_path += pack('<I', x)
# 2) all used keys in transaction
subkeys = {}
derivations = self.get_tx_derivations(tx)
for xpubkey in derivations:
pubkey = xpubkey_to_pubkey(xpubkey)
# assuming depth two, non-harded: change + index
aa, bb = derivations[xpubkey]
assert 0 <= aa < 0x80000000
assert 0 <= bb < 0x80000000
subkeys[bfh(pubkey)] = base_path + pack('<II', aa, bb)
for txin in inputs:
if txin['type'] == 'coinbase':
self.give_error("Coinbase not supported")
if txin['type'] in ['p2sh', 'p2wsh-p2sh', 'p2wsh']:
self.give_error('No support yet for inputs of type: ' + txin['type'])
# Construct PSBT from start to finish.
out_fd = io.BytesIO()
out_fd.write(b'psbt\xff')
def write_kv(ktype, val, key=b''):
# serialize helper: write w/ size and key byte
out_fd.write(my_var_int(1 + len(key)))
out_fd.write(bytes([ktype]) + key)
if isinstance(val, str):
val = bfh(val)
out_fd.write(my_var_int(len(val)))
out_fd.write(val)
# global section: just the unsigned txn
class CustomTXSerialization(Transaction):
@classmethod
def input_script(cls, txin, estimate_size=False):
return ''
unsigned = bfh(CustomTXSerialization(tx.serialize()).serialize_to_network(witness=False))
write_kv(PSBT_GLOBAL_UNSIGNED_TX, unsigned)
# end globals section
out_fd.write(b'\x00')
# inputs section
for txin in inputs:
if Transaction.is_segwit_input(txin):
utxo = txin['prev_tx'].outputs()[txin['prevout_n']]
spendable = txin['prev_tx'].serialize_output(utxo)
write_kv(PSBT_IN_WITNESS_UTXO, spendable)
else:
write_kv(PSBT_IN_NON_WITNESS_UTXO, str(txin['prev_tx']))
pubkeys, x_pubkeys = tx.get_sorted_pubkeys(txin)
pubkeys = [bfh(k) for k in pubkeys]
for k in pubkeys:
write_kv(PSBT_IN_BIP32_DERIVATION, subkeys[k], k)
if txin['type'] == 'p2wpkh-p2sh':
assert len(pubkeys) == 1, 'can be only one redeem script per input'
pa = hash_160(k)
assert len(pa) == 20
write_kv(PSBT_IN_REDEEM_SCRIPT, b'\x00\x14'+pa)
out_fd.write(b'\x00')
# outputs section
for o in tx.outputs():
# can be empty, but must be present, and helpful to show change inputs
# wallet.add_hw_info() adds some data about change outputs into tx.output_info
if o.address in tx.output_info:
# this address "is_mine" but might not be change (I like to sent to myself)
output_info = tx.output_info.get(o.address)
index, xpubs = output_info.address_index, output_info.sorted_xpubs
if index[0] == 1 and len(index) == 2:
# it is a change output (based on our standard derivation path)
assert len(xpubs) == 1 # not expecting multisig
xpubkey = xpubs[0]
# document its bip32 derivation in output section
aa, bb = index
assert 0 <= aa < 0x80000000
assert 0 <= bb < 0x80000000
deriv = base_path + pack('<II', aa, bb)
pubkey = bfh(self.get_pubkey_from_xpub(xpubkey, index))
write_kv(PSBT_OUT_BIP32_DERIVATION, deriv, pubkey)
if output_info.script_type == 'p2wpkh-p2sh':
pa = hash_160(pubkey)
assert len(pa) == 20
write_kv(PSBT_OUT_REDEEM_SCRIPT, b'\x00\x14' + pa)
out_fd.write(b'\x00')
return out_fd.getvalue()
@wrap_busy @wrap_busy
def sign_transaction(self, tx, password): def sign_transaction(self, tx: Transaction, password):
# Build a PSBT in memory, upload it for signing. # Build a PSBT in memory, upload it for signing.
# - we can also work offline (without paired device present) # - we can also work offline (without paired device present)
if tx.is_complete(): if tx.is_complete():
return return
assert self.my_wallet, "Not clear which wallet associated with this Coldcard"
client = self.get_client() client = self.get_client()
if 0:
from pprint import pprint
for n,i in enumerate(tx.inputs()):
print('[%d]: ' % n, end='')
pprint(i)
assert client.dev.master_fingerprint == self.ckcc_xfp assert client.dev.master_fingerprint == self.ckcc_xfp
raw_psbt = self.build_psbt(tx) # makes PSBT required
raw_psbt = build_psbt(tx, self.my_wallet)
#open('debug.psbt', 'wb').write(out_fd.getvalue()) cc_finalize = not (type(self.my_wallet) is Multisig_Wallet)
try: try:
try: try:
self.handler.show_message("Authorize Transaction...") self.handler.show_message("Authorize Transaction...")
client.sign_transaction_start(raw_psbt, True) client.sign_transaction_start(raw_psbt, cc_finalize)
while 1: while 1:
# How to kill some time, without locking UI? # How to kill some time, without locking UI?
@ -545,7 +418,7 @@ class Coldcard_KeyStore(Hardware_KeyStore):
rlen, rsha = resp rlen, rsha = resp
# download the resulting txn. # download the resulting txn.
new_raw = client.download_file(rlen, rsha) raw_resp = client.download_file(rlen, rsha)
finally: finally:
self.handler.finished() self.handler.finished()
@ -559,8 +432,18 @@ class Coldcard_KeyStore(Hardware_KeyStore):
self.give_error(e, True) self.give_error(e, True)
return return
# trust the coldcard to re-searilize final product right? if cc_finalize:
tx.update(bh2u(new_raw)) # We trust the coldcard to re-serialize final transaction ready to go
tx.update(bh2u(raw_resp))
else:
# apply partial signatures back into txn
psbt = BasicPSBT()
psbt.parse(raw_resp, client.label())
merge_sigs_from_psbt(tx, psbt)
# caller's logic looks at tx now and if it's sufficiently signed,
# will send it if that's the user's intent.
@staticmethod @staticmethod
def _encode_txin_type(txin_type): def _encode_txin_type(txin_type):
@ -593,11 +476,30 @@ class Coldcard_KeyStore(Hardware_KeyStore):
self.logger.exception('') self.logger.exception('')
self.handler.show_error(exc) self.handler.show_error(exc)
@wrap_busy
def show_p2sh_address(self, M, script, xfp_paths, txin_type):
client = self.get_client()
addr_fmt = self._encode_txin_type(txin_type)
try:
try:
self.handler.show_message(_("Showing address ..."))
dev_addr = client.show_p2sh_address(M, xfp_paths, script, addr_fmt=addr_fmt)
# we could double check address here
finally:
self.handler.finished()
except CCProtoError as exc:
self.logger.exception('Error showing address')
self.handler.show_error('{}\n\n{}'.format(
_('Error showing address') + ':', str(exc)))
except BaseException as exc:
self.logger.exception('')
self.handler.show_error(exc)
class ColdcardPlugin(HW_PluginBase): class ColdcardPlugin(HW_PluginBase):
keystore_class = Coldcard_KeyStore keystore_class = Coldcard_KeyStore
minimum_library = (0, 7, 2) minimum_library = (0, 7, 7)
client = None client = None
DEVICE_IDS = [ DEVICE_IDS = [
@ -605,8 +507,7 @@ class ColdcardPlugin(HW_PluginBase):
(COINKITE_VID, CKCC_SIMULATED_PID) (COINKITE_VID, CKCC_SIMULATED_PID)
] ]
#SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh') SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
SUPPORTED_XTYPES = ('standard', 'p2wpkh', 'p2wpkh-p2sh')
def __init__(self, parent, config, name): def __init__(self, parent, config, name):
HW_PluginBase.__init__(self, parent, config, name) HW_PluginBase.__init__(self, parent, config, name)
@ -682,31 +583,103 @@ class ColdcardPlugin(HW_PluginBase):
return xpub return xpub
def get_client(self, keystore, force_pair=True): def get_client(self, keystore, force_pair=True):
# All client interaction should not be in the main GUI thread # Acquire a connection to the hardware device (via USB)
devmgr = self.device_manager() devmgr = self.device_manager()
handler = keystore.handler handler = keystore.handler
with devmgr.hid_lock: with devmgr.hid_lock:
client = devmgr.client_for_keystore(self, handler, keystore, force_pair) client = devmgr.client_for_keystore(self, handler, keystore, force_pair)
# returns the client for a given keystore. can use xpub
#if client:
# client.used()
if client is not None: if client is not None:
client.ping_check() client.ping_check()
return client return client
@staticmethod
def export_ms_wallet(wallet, fp, name):
# Build the text file Coldcard needs to understand the multisig wallet
# it is participating in. All involved Coldcards can share same file.
print('# Exported from Electrum', file=fp)
print(f'Name: {name:.20s}', file=fp)
print(f'Policy: {wallet.m} of {wallet.n}', file=fp)
print(f'Format: {wallet.txin_type.upper()}' , file=fp)
xpubs = []
derivs = set()
for xp, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()):
xfp = xfp_for_keystore(ks)
dd = getattr(ks, 'derivation', 'm')
xpubs.append( (xfp2str(xfp), xp, dd) )
derivs.add(dd)
# Derivation doesn't matter too much to the Coldcard, since it
# uses key path data from PSBT or USB request as needed. However,
# if there is a clear value, provide it.
if len(derivs) == 1:
print("Derivation: " + derivs.pop(), file=fp)
print('', file=fp)
assert len(xpubs) == wallet.n
for xfp, xp, dd in xpubs:
if derivs:
# show as a comment if unclear
print(f'# derivation: {dd}', file=fp)
print(f'{xfp}: {xp}\n', file=fp)
def show_address(self, wallet, address, keystore=None): def show_address(self, wallet, address, keystore=None):
if keystore is None: if keystore is None:
keystore = wallet.get_keystore() keystore = wallet.get_keystore()
if not self.show_address_helper(wallet, address, keystore): if not self.show_address_helper(wallet, address, keystore):
return return
txin_type = wallet.get_txin_type(address)
# Standard_Wallet => not multisig, must be bip32 # Standard_Wallet => not multisig, must be bip32
if type(wallet) is not Standard_Wallet: if type(wallet) is Standard_Wallet:
sequence = wallet.get_address_index(address)
keystore.show_address(sequence, txin_type)
elif type(wallet) is Multisig_Wallet:
# More involved for P2SH/P2WSH addresses: need M, and all public keys, and their
# derivation paths. Must construct script, and track fingerprints+paths for
# all those keys
pubkeys = wallet.get_public_keys(address)
xfps = []
for xp, ks in zip(wallet.get_master_public_keys(), wallet.get_keystores()):
path = "%s/%d/%d" % (getattr(ks, 'derivation', 'm'),
*wallet.get_address_index(address))
# need master XFP for each co-signers
ks_xfp = xfp_for_keystore(ks)
xfps.append(unpacked_xfp_path(ks_xfp, path))
# put into BIP45 (sorted) order
pkx = list(sorted(zip(pubkeys, xfps)))
script = bfh(multisig_script([pk for pk,xfp in pkx], wallet.m))
keystore.show_p2sh_address(wallet.m, script, [xfp for pk,xfp in pkx], txin_type)
else:
keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device)) keystore.handler.show_error(_('This function is only available for standard wallets when using {}.').format(self.device))
return return
sequence = wallet.get_address_index(address) @classmethod
txin_type = wallet.get_txin_type(address) def link_wallet(cls, wallet):
keystore.show_address(sequence, txin_type) # PROBLEM: wallet.sign_transaction() does not pass in the wallet to the individual
# keystores, and we need to know about our co-signers at that time.
for ks in wallet.get_keystores():
if type(ks) == Coldcard_KeyStore:
if not ks.my_wallet:
ks.my_wallet = wallet
@hook
def make_unsigned_transaction(self, wallet, tx):
# - capture wallet containing each keystore early in the process
self.link_wallet(wallet)
# EOF # EOF

210
electrum/plugins/coldcard/qt.py

@ -1,18 +1,27 @@
import time import time, os
from functools import partial from functools import partial
from PyQt5.QtCore import Qt, pyqtSignal from PyQt5.QtCore import Qt, pyqtSignal
from PyQt5.QtWidgets import QPushButton, QLabel, QVBoxLayout, QWidget, QGridLayout from PyQt5.QtWidgets import QPushButton, QLabel, QVBoxLayout, QWidget, QGridLayout
from PyQt5.QtWidgets import QFileDialog
from electrum.i18n import _ from electrum.i18n import _
from electrum.plugin import hook from electrum.plugin import hook
from electrum.wallet import Standard_Wallet from electrum.wallet import Standard_Wallet, Multisig_Wallet
from electrum.gui.qt.util import WindowModalDialog, CloseButton, get_parent_main_window from electrum.gui.qt.util import WindowModalDialog, CloseButton, get_parent_main_window, Buttons
from electrum.transaction import Transaction
from .coldcard import ColdcardPlugin from .coldcard import ColdcardPlugin, xfp2str
from ..hw_wallet.qt import QtHandlerBase, QtPluginBase from ..hw_wallet.qt import QtHandlerBase, QtPluginBase
from ..hw_wallet.plugin import only_hook_if_libraries_available from ..hw_wallet.plugin import only_hook_if_libraries_available
from binascii import a2b_hex
from base64 import b64encode, b64decode
from .basic_psbt import BasicPSBT
from .build_psbt import build_psbt, merge_sigs_from_psbt, recover_tx_from_psbt
CC_DEBUG = False
class Plugin(ColdcardPlugin, QtPluginBase): class Plugin(ColdcardPlugin, QtPluginBase):
icon_unpaired = "coldcard_unpaired.png" icon_unpaired = "coldcard_unpaired.png"
@ -24,22 +33,62 @@ class Plugin(ColdcardPlugin, QtPluginBase):
@only_hook_if_libraries_available @only_hook_if_libraries_available
@hook @hook
def receive_menu(self, menu, addrs, wallet): def receive_menu(self, menu, addrs, wallet):
if type(wallet) is not Standard_Wallet: # Context menu on each address in the Addresses Tab, right click...
return
keystore = wallet.get_keystore() if type(wallet) is Standard_Wallet:
keystore = wallet.get_keystore()
else:
# find if any devices are connected and ready to go, use first of those.
for ks in wallet.get_keystores():
if ks.has_usable_connection_with_device():
keystore = ks
break
else:
# don't hook into menu
return
if type(keystore) == self.keystore_class and len(addrs) == 1: if type(keystore) == self.keystore_class and len(addrs) == 1:
def show_address(): def show_address():
keystore.thread.add(partial(self.show_address, wallet, addrs[0])) keystore.thread.add(partial(self.show_address, wallet, addrs[0], keystore=keystore))
menu.addAction(_("Show on Coldcard"), show_address) menu.addAction(_("Show on Coldcard ({})").format(keystore.label), show_address)
@only_hook_if_libraries_available
@hook
def wallet_info_buttons(self, main_window, dialog):
# user is about to see the "Wallet Information" dialog
# - add a button if multisig wallet, and a Coldcard is a cosigner.
wallet = main_window.wallet
if type(wallet) is not Multisig_Wallet:
return
if not any(type(ks) == self.keystore_class for ks in wallet.get_keystores()):
# doesn't involve a Coldcard wallet, hide feature
return
btn = QPushButton(_("Export for Coldcard"))
btn.clicked.connect(lambda unused: self.export_multisig_setup(main_window, wallet))
return Buttons(btn, CloseButton(dialog))
def export_multisig_setup(self, main_window, wallet):
basename = wallet.basename().rsplit('.', 1)[0] # trim .json
name = f'{basename}-cc-export.txt'.replace(' ', '-')
fileName = main_window.getSaveFileName(_("Select where to save the setup file"),
name, "*.txt")
if fileName:
with open(fileName, "wt") as f:
ColdcardPlugin.export_ms_wallet(wallet, f, basename)
main_window.show_message(_("Wallet setup file exported successfully"))
@only_hook_if_libraries_available @only_hook_if_libraries_available
@hook @hook
def transaction_dialog(self, dia): def transaction_dialog(self, dia):
# see gui/qt/transaction_dialog.py # see gui/qt/transaction_dialog.py
keystore = dia.wallet.get_keystore() # if not a Coldcard wallet, hide feature
if type(keystore) != self.keystore_class: if not any(type(ks) == self.keystore_class for ks in dia.wallet.get_keystores()):
# not a Coldcard wallet, hide feature
return return
# - add a new button, near "export" # - add a new button, near "export"
@ -65,23 +114,110 @@ class Plugin(ColdcardPlugin, QtPluginBase):
assert type(keystore) == self.keystore_class assert type(keystore) == self.keystore_class
# convert to PSBT # convert to PSBT
raw_psbt = keystore.build_psbt(tx, wallet=dia.wallet) build_psbt(tx, dia.wallet)
name = (dia.wallet.basename() + time.strftime('-%y%m%d-%H%M.psbt')).replace(' ', '-') name = (dia.wallet.basename() + time.strftime('-%y%m%d-%H%M.psbt'))\
.replace(' ', '-').replace('.json', '')
fileName = dia.main_window.getSaveFileName(_("Select where to save the PSBT file"), fileName = dia.main_window.getSaveFileName(_("Select where to save the PSBT file"),
name, "*.psbt") name, "*.psbt")
if fileName: if fileName:
with open(fileName, "wb+") as f: with open(fileName, "wb+") as f:
f.write(raw_psbt) f.write(tx.raw_psbt)
dia.show_message(_("Transaction exported successfully")) dia.show_message(_("Transaction exported successfully"))
dia.saved = True dia.saved = True
def show_settings_dialog(self, window, keystore): def show_settings_dialog(self, window, keystore):
# When they click on the icon for CC we come here. # When they click on the icon for CC we come here.
device_id = self.choose_device(window, keystore) # - doesn't matter if device not connected, continue
if device_id: CKCCSettingsDialog(window, self, keystore).exec_()
CKCCSettingsDialog(window, self, keystore, device_id).exec_()
@hook
def init_menubar_tools(self, main_window, tools_menu):
# add some PSBT-related tools to the "Load Transaction" menu.
rt = main_window.raw_transaction_menu
wallet = main_window.wallet
rt.addAction(_("From &PSBT File or Files"), lambda: self.psbt_combiner(main_window, wallet))
def psbt_combiner(self, window, wallet):
title = _("Select the PSBT file to load or PSBT files to combine")
directory = ''
fnames, __ = QFileDialog.getOpenFileNames(window, title, directory, "PSBT Files (*.psbt)")
psbts = []
for fn in fnames:
try:
with open(fn, "rb") as f:
raw = f.read()
psbt = BasicPSBT()
psbt.parse(raw, fn)
psbts.append(psbt)
except (AssertionError, ValueError, IOError, os.error) as reason:
window.show_critical(_("Electrum was unable to open your PSBT file") + "\n" + str(reason), title=_("Unable to read file"))
return
warn = []
if not psbts: return # user picked nothing
# Consistency checks and warnings.
try:
first = psbts[0]
for p in psbts:
fn = os.path.split(p.filename)[1]
assert (p.txn == first.txn), \
"All must relate to the same unsigned transaction."
for idx, inp in enumerate(p.inputs):
if not inp.part_sigs:
warn.append(fn + ':\n ' + _("No partial signatures found for input #%d") % idx)
assert first.inputs[idx].redeem_script == inp.redeem_script, "Mismatched redeem scripts"
assert first.inputs[idx].witness_script == inp.witness_script, "Mismatched witness"
except AssertionError as exc:
# Fatal errors stop here.
window.show_critical(str(exc),
title=_("Unable to combine PSBT files, check: ")+p.filename)
return
if warn:
# Lots of potential warnings...
window.show_warning('\n\n'.join(warn), title=_("PSBT warnings"))
# Construct an Electrum transaction object from data in first PSBT file.
try:
tx = recover_tx_from_psbt(first, wallet)
except BaseException as exc:
if CC_DEBUG:
from PyQt5.QtCore import pyqtRemoveInputHook; pyqtRemoveInputHook()
import pdb; pdb.post_mortem()
window.show_critical(str(exc), title=_("Unable to understand PSBT file"))
return
# Combine the signatures from all the PSBTS (may do nothing if unsigned PSBTs)
for p in psbts:
try:
merge_sigs_from_psbt(tx, p)
except BaseException as exc:
if CC_DEBUG:
from PyQt5.QtCore import pyqtRemoveInputHook; pyqtRemoveInputHook()
import pdb; pdb.post_mortem()
window.show_critical("Unable to merge signatures: " + str(exc),
title=_("Unable to combine PSBT file: ") + p.filename)
return
# Display result, might not be complete yet, but hopefully it's ready to transmit!
if len(psbts) == 1:
desc = _("From PSBT file: ") + fn
else:
desc = _("Combined from %d PSBT files") % len(psbts)
# need to associated our pluging to this wallet
ColdcardPlugin.link_wallet(wallet)
window.show_transaction(tx, desc)
class Coldcard_Handler(QtHandlerBase): class Coldcard_Handler(QtHandlerBase):
setup_signal = pyqtSignal() setup_signal = pyqtSignal()
@ -112,21 +248,25 @@ class Coldcard_Handler(QtHandlerBase):
return return
class CKCCSettingsDialog(WindowModalDialog): class CKCCSettingsDialog(WindowModalDialog):
'''This dialog doesn't require a device be paired with a wallet.
We want users to be able to wipe a device even if they've forgotten
their PIN.'''
def __init__(self, window, plugin, keystore, device_id): def __init__(self, window, plugin, keystore):
title = _("{} Settings").format(plugin.device) title = _("{} Settings").format(plugin.device)
super(CKCCSettingsDialog, self).__init__(window, title) super(CKCCSettingsDialog, self).__init__(window, title)
self.setMaximumWidth(540) self.setMaximumWidth(540)
# Note: Coldcard may **not** be connected at present time. Keep working!
devmgr = plugin.device_manager() devmgr = plugin.device_manager()
config = devmgr.config #config = devmgr.config
handler = keystore.handler #handler = keystore.handler
self.thread = thread = keystore.thread self.thread = thread = keystore.thread
self.keystore = keystore
def connect_and_doit(): def connect_and_doit():
# Attempt connection to device, or raise.
device_id = plugin.choose_device(window, keystore)
if not device_id:
raise RuntimeError("Device not connected")
client = devmgr.client_by_id(device_id) client = devmgr.client_by_id(device_id)
if not client: if not client:
raise RuntimeError("Device not connected") raise RuntimeError("Device not connected")
@ -148,13 +288,14 @@ class CKCCSettingsDialog(WindowModalDialog):
y = 3 y = 3
rows = [ rows = [
('xfp', _("Master Fingerprint")),
('serial', _("USB Serial")),
('fw_version', _("Firmware Version")), ('fw_version', _("Firmware Version")),
('fw_built', _("Build Date")), ('fw_built', _("Build Date")),
('bl_version', _("Bootloader")), ('bl_version', _("Bootloader")),
('xfp', _("Master Fingerprint")),
('serial', _("USB Serial")),
] ]
for row_num, (member_name, label) in enumerate(rows): for row_num, (member_name, label) in enumerate(rows):
# XXX we know xfp already, even if not connected
widget = QLabel('<tt>000000000000') widget = QLabel('<tt>000000000000')
widget.setTextInteractionFlags(Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard) widget.setTextInteractionFlags(Qt.TextSelectableByMouse | Qt.TextSelectableByKeyboard)
@ -164,7 +305,7 @@ class CKCCSettingsDialog(WindowModalDialog):
y += 1 y += 1
body_layout.addLayout(grid) body_layout.addLayout(grid)
upg_btn = QPushButton('Upgrade') upg_btn = QPushButton(_('Upgrade'))
#upg_btn.setDefault(False) #upg_btn.setDefault(False)
def _start_upgrade(): def _start_upgrade():
thread.add(connect_and_doit, on_success=self.start_upgrade) thread.add(connect_and_doit, on_success=self.start_upgrade)
@ -177,13 +318,22 @@ class CKCCSettingsDialog(WindowModalDialog):
dialog_vbox = QVBoxLayout(self) dialog_vbox = QVBoxLayout(self)
dialog_vbox.addWidget(body) dialog_vbox.addWidget(body)
# Fetch values and show them # Fetch firmware/versions values and show them.
thread.add(connect_and_doit, on_success=self.show_values) thread.add(connect_and_doit, on_success=self.show_values, on_error=self.show_placeholders)
def show_placeholders(self, unclear_arg):
# device missing, so hide lots of detail.
self.xfp.setText('<tt>%s' % xfp2str(self.keystore.ckcc_xfp))
self.serial.setText('(not connected)')
self.fw_version.setText('')
self.fw_built.setText('')
self.bl_version.setText('')
def show_values(self, client): def show_values(self, client):
dev = client.dev dev = client.dev
self.xfp.setText('<tt>0x%08x' % dev.master_fingerprint) self.xfp.setText('<tt>%s' % xfp2str(dev.master_fingerprint))
self.serial.setText('<tt>%s' % dev.serial) self.serial.setText('<tt>%s' % dev.serial)
# ask device for versions: allow extras for future # ask device for versions: allow extras for future

2
electrum/plugins/hw_wallet/qt.py

@ -265,4 +265,4 @@ class QtPluginBase(object):
else: else:
addr = uri.get('address') addr = uri.get('address')
keystore.thread.add(partial(plugin.show_address, wallet, addr, keystore)) keystore.thread.add(partial(plugin.show_address, wallet, addr, keystore))
receive_address_e.addButton("eye1.png", show_address, _("Show on {}").format(plugin.device)) receive_address_e.addButton("eye1.png", show_address, _("Show on {}").format(keystore.label))

Loading…
Cancel
Save