|
@ -10,7 +10,7 @@ import struct |
|
|
from electrum import bip32 |
|
|
from electrum import bip32 |
|
|
from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes |
|
|
from electrum.bip32 import BIP32Node, InvalidMasterKeyVersionBytes |
|
|
from electrum.i18n import _ |
|
|
from electrum.i18n import _ |
|
|
from electrum.plugin import Device, hook |
|
|
from electrum.plugin import Device, hook, runs_in_hwd_thread |
|
|
from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK |
|
|
from electrum.keystore import Hardware_KeyStore, KeyStoreWithMPK |
|
|
from electrum.transaction import PartialTransaction |
|
|
from electrum.transaction import PartialTransaction |
|
|
from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet |
|
|
from electrum.wallet import Standard_Wallet, Multisig_Wallet, Abstract_Wallet |
|
@ -72,9 +72,8 @@ class CKCCClient(HardwareClientBase): |
|
|
self.dev = ElectrumColdcardDevice(dev_path, encrypt=True) |
|
|
self.dev = ElectrumColdcardDevice(dev_path, encrypt=True) |
|
|
else: |
|
|
else: |
|
|
# open the real HID device |
|
|
# open the real HID device |
|
|
with self.device_manager().hid_lock: |
|
|
hd = hid.device(path=dev_path) |
|
|
hd = hid.device(path=dev_path) |
|
|
hd.open_path(dev_path) |
|
|
hd.open_path(dev_path) |
|
|
|
|
|
|
|
|
|
|
|
self.dev = ElectrumColdcardDevice(dev=hd, encrypt=True) |
|
|
self.dev = ElectrumColdcardDevice(dev=hd, encrypt=True) |
|
|
|
|
|
|
|
@ -85,6 +84,7 @@ class CKCCClient(HardwareClientBase): |
|
|
return '<CKCCClient: xfp=%s label=%r>' % (xfp2str(self.dev.master_fingerprint), |
|
|
return '<CKCCClient: xfp=%s label=%r>' % (xfp2str(self.dev.master_fingerprint), |
|
|
self.label()) |
|
|
self.label()) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def verify_connection(self, expected_xfp: int, expected_xpub=None): |
|
|
def verify_connection(self, expected_xfp: int, expected_xpub=None): |
|
|
ex = (expected_xfp, expected_xpub) |
|
|
ex = (expected_xfp, expected_xpub) |
|
|
|
|
|
|
|
@ -121,14 +121,10 @@ class CKCCClient(HardwareClientBase): |
|
|
# can't do anything w/ devices that aren't setup (this code not normally reachable) |
|
|
# can't do anything w/ devices that aren't setup (this code not normally reachable) |
|
|
return bool(self.dev.master_xpub) |
|
|
return bool(self.dev.master_xpub) |
|
|
|
|
|
|
|
|
def timeout(self, cutoff): |
|
|
@runs_in_hwd_thread |
|
|
# nothing to do? |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def close(self): |
|
|
def close(self): |
|
|
# close the HID device (so can be reused) |
|
|
# close the HID device (so can be reused) |
|
|
with self.device_manager().hid_lock: |
|
|
self.dev.close() |
|
|
self.dev.close() |
|
|
|
|
|
self.dev = None |
|
|
self.dev = None |
|
|
|
|
|
|
|
|
def is_initialized(self): |
|
|
def is_initialized(self): |
|
@ -160,6 +156,7 @@ class CKCCClient(HardwareClientBase): |
|
|
|
|
|
|
|
|
return LabelStr(lab, self.dev.master_fingerprint, self.dev.master_xpub) |
|
|
return LabelStr(lab, self.dev.master_fingerprint, self.dev.master_xpub) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def has_usable_connection_with_device(self): |
|
|
def has_usable_connection_with_device(self): |
|
|
# Do end-to-end ping test |
|
|
# Do end-to-end ping test |
|
|
try: |
|
|
try: |
|
@ -168,6 +165,7 @@ class CKCCClient(HardwareClientBase): |
|
|
except: |
|
|
except: |
|
|
return False |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def get_xpub(self, bip32_path, xtype): |
|
|
def get_xpub(self, bip32_path, xtype): |
|
|
assert xtype in ColdcardPlugin.SUPPORTED_XTYPES |
|
|
assert xtype in ColdcardPlugin.SUPPORTED_XTYPES |
|
|
_logger.info('Derive xtype = %r' % xtype) |
|
|
_logger.info('Derive xtype = %r' % xtype) |
|
@ -183,6 +181,7 @@ class CKCCClient(HardwareClientBase): |
|
|
xpub = node._replace(xtype=xtype).to_xpub() |
|
|
xpub = node._replace(xtype=xtype).to_xpub() |
|
|
return xpub |
|
|
return xpub |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def ping_check(self): |
|
|
def ping_check(self): |
|
|
# check connection is working |
|
|
# check connection is working |
|
|
assert self.dev.session_key, 'not encrypted?' |
|
|
assert self.dev.session_key, 'not encrypted?' |
|
@ -193,26 +192,32 @@ class CKCCClient(HardwareClientBase): |
|
|
except: |
|
|
except: |
|
|
raise RuntimeError("Communication trouble with Coldcard") |
|
|
raise RuntimeError("Communication trouble with Coldcard") |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def show_address(self, path, addr_fmt): |
|
|
def show_address(self, path, addr_fmt): |
|
|
# prompt user w/ address, also returns it immediately. |
|
|
# prompt user w/ address, also returns it immediately. |
|
|
return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) |
|
|
return self.dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def show_p2sh_address(self, *args, **kws): |
|
|
def show_p2sh_address(self, *args, **kws): |
|
|
# prompt user w/ p2sh address, also returns it immediately. |
|
|
# prompt user w/ p2sh address, also returns it immediately. |
|
|
return self.dev.send_recv(CCProtocolPacker.show_p2sh_address(*args, **kws), timeout=None) |
|
|
return self.dev.send_recv(CCProtocolPacker.show_p2sh_address(*args, **kws), timeout=None) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def get_version(self): |
|
|
def get_version(self): |
|
|
# gives list of strings |
|
|
# gives list of strings |
|
|
return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n') |
|
|
return self.dev.send_recv(CCProtocolPacker.version(), timeout=1000).split('\n') |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def sign_message_start(self, path, msg): |
|
|
def sign_message_start(self, path, msg): |
|
|
# this starts the UX experience. |
|
|
# this starts the UX experience. |
|
|
self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None) |
|
|
self.dev.send_recv(CCProtocolPacker.sign_message(msg, path), timeout=None) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def sign_message_poll(self): |
|
|
def sign_message_poll(self): |
|
|
# poll device... if user has approved, will get tuple: (addr, sig) else None |
|
|
# poll device... if user has approved, will get tuple: (addr, sig) else None |
|
|
return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) |
|
|
return self.dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def sign_transaction_start(self, raw_psbt: bytes, *, finalize: bool = False): |
|
|
def sign_transaction_start(self, raw_psbt: bytes, *, finalize: bool = False): |
|
|
# Multiple steps to sign: |
|
|
# Multiple steps to sign: |
|
|
# - upload binary |
|
|
# - upload binary |
|
@ -228,10 +233,12 @@ class CKCCClient(HardwareClientBase): |
|
|
if resp != None: |
|
|
if resp != None: |
|
|
raise ValueError(resp) |
|
|
raise ValueError(resp) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def sign_transaction_poll(self): |
|
|
def sign_transaction_poll(self): |
|
|
# poll device... if user has approved, will get tuple: (legnth, checksum) else None |
|
|
# poll device... if user has approved, will get tuple: (legnth, checksum) else None |
|
|
return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) |
|
|
return self.dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def download_file(self, length, checksum, file_number=1): |
|
|
def download_file(self, length, checksum, file_number=1): |
|
|
# get a file |
|
|
# get a file |
|
|
return self.dev.download_file(length, checksum, file_number=file_number) |
|
|
return self.dev.download_file(length, checksum, file_number=file_number) |
|
@ -317,7 +324,6 @@ class Coldcard_KeyStore(Hardware_KeyStore): |
|
|
% MSG_SIGNING_MAX_LENGTH) |
|
|
% MSG_SIGNING_MAX_LENGTH) |
|
|
return b'' |
|
|
return b'' |
|
|
|
|
|
|
|
|
client = self.get_client() |
|
|
|
|
|
path = self.get_derivation_prefix() + ("/%d/%d" % sequence) |
|
|
path = self.get_derivation_prefix() + ("/%d/%d" % sequence) |
|
|
try: |
|
|
try: |
|
|
cl = self.get_client() |
|
|
cl = self.get_client() |
|
@ -508,6 +514,7 @@ class ColdcardPlugin(HW_PluginBase): |
|
|
|
|
|
|
|
|
return [] |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def create_client(self, device, handler): |
|
|
def create_client(self, device, handler): |
|
|
if handler: |
|
|
if handler: |
|
|
self.handler = handler |
|
|
self.handler = handler |
|
@ -539,6 +546,7 @@ class ColdcardPlugin(HW_PluginBase): |
|
|
xpub = client.get_xpub(derivation, xtype) |
|
|
xpub = client.get_xpub(derivation, xtype) |
|
|
return xpub |
|
|
return xpub |
|
|
|
|
|
|
|
|
|
|
|
@runs_in_hwd_thread |
|
|
def get_client(self, keystore, force_pair=True, *, |
|
|
def get_client(self, keystore, force_pair=True, *, |
|
|
devices=None, allow_user_interaction=True) -> Optional['CKCCClient']: |
|
|
devices=None, allow_user_interaction=True) -> Optional['CKCCClient']: |
|
|
# Acquire a connection to the hardware device (via USB) |
|
|
# Acquire a connection to the hardware device (via USB) |
|
|