Browse Source

Merge pull request #4875 from matejcik/trezor-0.11

WIP: Trezor 0.11
3.3.3.1
ghost43 6 years ago
committed by GitHub
parent
commit
1546d65ebe
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      contrib/build-wine/deterministic.spec
  2. 1
      contrib/osx/osx.spec
  3. 14
      electrum/plugins/hw_wallet/plugin.py
  4. 6
      electrum/plugins/trezor/__init__.py
  5. 11
      electrum/plugins/trezor/client.py
  6. 368
      electrum/plugins/trezor/clientbase.py
  7. 75
      electrum/plugins/trezor/qt.py
  8. 95
      electrum/plugins/trezor/transport.py
  9. 320
      electrum/plugins/trezor/trezor.py

1
contrib/build-wine/deterministic.spec

@ -57,7 +57,6 @@ a = Analysis([home+'run_electrum',
home+'electrum/commands.py',
home+'electrum/plugins/cosigner_pool/qt.py',
home+'electrum/plugins/email_requests/qt.py',
home+'electrum/plugins/trezor/client.py',
home+'electrum/plugins/trezor/qt.py',
home+'electrum/plugins/safe_t/client.py',
home+'electrum/plugins/safe_t/qt.py',

1
contrib/osx/osx.spec

@ -63,7 +63,6 @@ a = Analysis([electrum+ MAIN_SCRIPT,
electrum+'electrum/commands.py',
electrum+'electrum/plugins/cosigner_pool/qt.py',
electrum+'electrum/plugins/email_requests/qt.py',
electrum+'electrum/plugins/trezor/client.py',
electrum+'electrum/plugins/trezor/qt.py',
electrum+'electrum/plugins/safe_t/client.py',
electrum+'electrum/plugins/safe_t/qt.py',

14
electrum/plugins/hw_wallet/plugin.py

@ -90,6 +90,9 @@ class HW_PluginBase(BasePlugin):
raise NotImplementedError()
def check_libraries_available(self) -> bool:
def version_str(t):
return ".".join(str(i) for i in t)
try:
library_version = self.get_library_version()
except ImportError:
@ -99,9 +102,18 @@ class HW_PluginBase(BasePlugin):
self.libraries_available_message = (
_("Library version for '{}' is too old.").format(self.name)
+ '\nInstalled: {}, Needed: {}'
.format(library_version, self.minimum_library))
.format(library_version, version_str(self.minimum_library)))
self.print_stderr(self.libraries_available_message)
return False
elif hasattr(self, "maximum_library") and \
versiontuple(library_version) >= self.maximum_library:
self.libraries_available_message = (
_("Library version for '{}' is incompatible.").format(self.name)
+ '\nInstalled: {}, Needed: less than {}'
.format(library_version, version_str(self.maximum_library)))
self.print_stderr(self.libraries_available_message)
return False
return True
def get_library_not_available_message(self) -> str:

6
electrum/plugins/trezor/__init__.py

@ -1,8 +1,8 @@
from electrum.i18n import _
fullname = 'TREZOR Wallet'
description = _('Provides support for TREZOR hardware wallet')
fullname = 'Trezor Wallet'
description = _('Provides support for Trezor hardware wallet')
requires = [('trezorlib','github.com/trezor/python-trezor')]
registers_keystore = ('hardware', 'trezor', _("TREZOR wallet"))
registers_keystore = ('hardware', 'trezor', _("Trezor wallet"))
available_for = ['qt', 'cmdline']

11
electrum/plugins/trezor/client.py

@ -1,11 +0,0 @@
from trezorlib.client import proto, BaseClient, ProtocolMixin
from .clientbase import TrezorClientBase
class TrezorClient(TrezorClientBase, ProtocolMixin, BaseClient):
def __init__(self, transport, handler, plugin):
BaseClient.__init__(self, transport=transport)
ProtocolMixin.__init__(self, transport=transport)
TrezorClientBase.__init__(self, handler, plugin, proto)
TrezorClientBase.wrap_methods(TrezorClient)

368
electrum/plugins/trezor/clientbase.py

@ -2,120 +2,80 @@ import time
from struct import pack
from electrum.i18n import _
from electrum.util import PrintError, UserCancelled
from electrum.util import PrintError, UserCancelled, UserFacingException
from electrum.keystore import bip39_normalize_passphrase
from electrum.bip32 import serialize_xpub, convert_bip32_path_to_list_of_uint32
class GuiMixin(object):
# Requires: self.proto, self.device
# ref: https://github.com/trezor/trezor-common/blob/44dfb07cfaafffada4b2ce0d15ba1d90d17cf35e/protob/types.proto#L89
messages = {
3: _("Confirm the transaction output on your {} device"),
4: _("Confirm internal entropy on your {} device to begin"),
5: _("Write down the seed word shown on your {}"),
6: _("Confirm on your {} that you want to wipe it clean"),
7: _("Confirm on your {} device the message to sign"),
8: _("Confirm the total amount spent and the transaction fee on your "
"{} device"),
10: _("Confirm wallet address on your {} device"),
14: _("Choose on your {} device where to enter your passphrase"),
'default': _("Check your {} device to continue"),
}
def callback_Failure(self, msg):
# BaseClient's unfortunate call() implementation forces us to
# raise exceptions on failure in order to unwind the stack.
# However, making the user acknowledge they cancelled
# gets old very quickly, so we suppress those. The NotInitialized
# one is misnamed and indicates a passphrase request was cancelled.
if msg.code in (self.types.FailureType.PinCancelled,
self.types.FailureType.ActionCancelled,
self.types.FailureType.NotInitialized):
raise UserCancelled()
raise RuntimeError(msg.message)
def callback_ButtonRequest(self, msg):
message = self.msg
if not message:
message = self.messages.get(msg.code, self.messages['default'])
self.handler.show_message(message.format(self.device), self.cancel)
return self.proto.ButtonAck()
def callback_PinMatrixRequest(self, msg):
if msg.type == 2:
msg = _("Enter a new PIN for your {}:")
elif msg.type == 3:
msg = (_("Re-enter the new PIN for your {}.\n\n"
"NOTE: the positions of the numbers have changed!"))
else:
msg = _("Enter your current {} PIN:")
pin = self.handler.get_pin(msg.format(self.device))
if len(pin) > 9:
self.handler.show_error(_('The PIN cannot be longer than 9 characters.'))
pin = '' # to cancel below
if not pin:
return self.proto.Cancel()
return self.proto.PinMatrixAck(pin=pin)
from electrum.bip32 import serialize_xpub, convert_bip32_path_to_list_of_uint32 as parse_path
from trezorlib.client import TrezorClient
from trezorlib.exceptions import TrezorFailure, Cancelled, OutdatedFirmwareError
from trezorlib.messages import WordRequestType, FailureType, RecoveryDeviceType
import trezorlib.btc
import trezorlib.device
MESSAGES = {
3: _("Confirm the transaction output on your {} device"),
4: _("Confirm internal entropy on your {} device to begin"),
5: _("Write down the seed word shown on your {}"),
6: _("Confirm on your {} that you want to wipe it clean"),
7: _("Confirm on your {} device the message to sign"),
8: _("Confirm the total amount spent and the transaction fee on your {} device"),
10: _("Confirm wallet address on your {} device"),
14: _("Choose on your {} device where to enter your passphrase"),
'default': _("Check your {} device to continue"),
}
class TrezorClientBase(PrintError):
def __init__(self, transport, handler, plugin):
self.client = TrezorClient(transport, ui=self)
self.plugin = plugin
self.device = plugin.device
self.handler = handler
def callback_PassphraseRequest(self, req):
if req and hasattr(req, 'on_device') and req.on_device is True:
return self.proto.PassphraseAck()
self.msg = None
self.creating_wallet = False
if self.creating_wallet:
msg = _("Enter a passphrase to generate this wallet. Each time "
"you use this wallet your {} will prompt you for the "
"passphrase. If you forget the passphrase you cannot "
"access the bitcoins in the wallet.").format(self.device)
else:
msg = _("Enter the passphrase to unlock this wallet:")
passphrase = self.handler.get_passphrase(msg, self.creating_wallet)
if passphrase is None:
return self.proto.Cancel()
passphrase = bip39_normalize_passphrase(passphrase)
self.in_flow = False
ack = self.proto.PassphraseAck(passphrase=passphrase)
length = len(ack.passphrase)
if length > 50:
self.handler.show_error(_("Too long passphrase ({} > 50 chars).").format(length))
return self.proto.Cancel()
return ack
def callback_PassphraseStateRequest(self, msg):
return self.proto.PassphraseStateAck()
def callback_WordRequest(self, msg):
if (msg.type is not None
and msg.type in (self.types.WordRequestType.Matrix9,
self.types.WordRequestType.Matrix6)):
num = 9 if msg.type == self.types.WordRequestType.Matrix9 else 6
char = self.handler.get_matrix(num)
if char == 'x':
return self.proto.Cancel()
return self.proto.WordAck(word=char)
self.step += 1
msg = _("Step {}/24. Enter seed word as explained on "
"your {}:").format(self.step, self.device)
word = self.handler.get_word(msg)
# Unfortunately the device can't handle self.proto.Cancel()
return self.proto.WordAck(word=word)
class TrezorClientBase(GuiMixin, PrintError):
def __init__(self, handler, plugin, proto):
assert hasattr(self, 'tx_api') # ProtocolMixin already constructed?
self.proto = proto
self.device = plugin.device
self.handler = handler
self.tx_api = plugin
self.types = plugin.types
self.used()
def run_flow(self, message=None, creating_wallet=False):
if self.in_flow:
raise RuntimeError("Overlapping call to run_flow")
self.in_flow = True
self.msg = message
self.creating_wallet = creating_wallet
self.prevent_timeouts()
return self
def end_flow(self):
self.in_flow = False
self.msg = None
self.creating_wallet = False
self.handler.finished()
self.used()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.end_flow()
if exc_value is not None:
if issubclass(exc_type, Cancelled):
raise UserCancelled from exc_value
elif issubclass(exc_type, TrezorFailure):
raise RuntimeError(exc_value.message) from exc_value
elif issubclass(exc_type, OutdatedFirmwareError):
raise UserFacingException(exc_value) from exc_value
else:
return False
return True
@property
def features(self):
return self.client.features
def __str__(self):
return "%s/%s" % (self.label(), self.features.device_id)
@ -131,8 +91,11 @@ class TrezorClientBase(GuiMixin, PrintError):
return not self.features.bootloader_mode
def has_usable_connection_with_device(self):
if self.in_flow:
return True
try:
res = self.ping("electrum pinging device")
res = self.client.ping("electrum pinging device")
assert res == "electrum pinging device"
except BaseException:
return False
@ -150,47 +113,41 @@ class TrezorClientBase(GuiMixin, PrintError):
self.print_error("timed out")
self.clear_session()
@staticmethod
def expand_path(n):
return convert_bip32_path_to_list_of_uint32(n)
def cancel(self):
'''Provided here as in keepkeylib but not trezorlib.'''
self.transport.write(self.proto.Cancel())
def i4b(self, x):
return pack('>I', x)
def get_xpub(self, bip32_path, xtype):
address_n = self.expand_path(bip32_path)
creating = False
node = self.get_public_node(address_n, creating).node
def get_xpub(self, bip32_path, xtype, creating=False):
address_n = parse_path(bip32_path)
with self.run_flow(creating_wallet=creating):
node = trezorlib.btc.get_public_node(self.client, address_n).node
return serialize_xpub(xtype, node.chain_code, node.public_key, node.depth, self.i4b(node.fingerprint), self.i4b(node.child_num))
def toggle_passphrase(self):
if self.features.passphrase_protection:
self.msg = _("Confirm on your {} device to disable passphrases")
msg = _("Confirm on your {} device to disable passphrases")
else:
self.msg = _("Confirm on your {} device to enable passphrases")
msg = _("Confirm on your {} device to enable passphrases")
enabled = not self.features.passphrase_protection
self.apply_settings(use_passphrase=enabled)
with self.run_flow(msg):
trezorlib.device.apply_settings(self.client, use_passphrase=enabled)
def change_label(self, label):
self.msg = _("Confirm the new label on your {} device")
self.apply_settings(label=label)
with self.run_flow(_("Confirm the new label on your {} device")):
trezorlib.device.apply_settings(self.client, label=label)
def change_homescreen(self, homescreen):
self.msg = _("Confirm on your {} device to change your home screen")
self.apply_settings(homescreen=homescreen)
with self.run_flow(_("Confirm on your {} device to change your home screen")):
trezorlib.device.apply_settings(self.client, homescreen=homescreen)
def set_pin(self, remove):
if remove:
self.msg = _("Confirm on your {} device to disable PIN protection")
msg = _("Confirm on your {} device to disable PIN protection")
elif self.features.pin_protection:
self.msg = _("Confirm on your {} device to change your PIN")
msg = _("Confirm on your {} device to change your PIN")
else:
self.msg = _("Confirm on your {} device to set a PIN")
self.change_pin(remove)
msg = _("Confirm on your {} device to set a PIN")
with self.run_flow(msg):
trezorlib.device.change_pin(remove)
def clear_session(self):
'''Clear the session to force pin (and passphrase if enabled)
@ -198,54 +155,131 @@ class TrezorClientBase(GuiMixin, PrintError):
self.print_error("clear session:", self)
self.prevent_timeouts()
try:
super(TrezorClientBase, self).clear_session()
self.client.clear_session()
except BaseException as e:
# If the device was removed it has the same effect...
self.print_error("clear_session: ignoring error", str(e))
def get_public_node(self, address_n, creating):
self.creating_wallet = creating
return super(TrezorClientBase, self).get_public_node(address_n)
def close(self):
'''Called when Our wallet was closed or the device removed.'''
self.print_error("closing client")
self.clear_session()
# Release the device
self.transport.close()
def firmware_version(self):
f = self.features
return (f.major_version, f.minor_version, f.patch_version)
def atleast_version(self, major, minor=0, patch=0):
return self.firmware_version() >= (major, minor, patch)
def is_uptodate(self):
if self.client.is_outdated():
return False
return self.client.version >= self.plugin.minimum_firmware
def get_trezor_model(self):
"""Returns '1' for Trezor One, 'T' for Trezor T."""
return self.features.model
@staticmethod
def wrapper(func):
'''Wrap methods to clear any message box they opened.'''
def wrapped(self, *args, **kwargs):
try:
self.prevent_timeouts()
return func(self, *args, **kwargs)
finally:
self.used()
self.handler.finished()
self.creating_wallet = False
self.msg = None
return wrapped
@staticmethod
def wrap_methods(cls):
for method in ['apply_settings', 'change_pin',
'get_address', 'get_public_node',
'load_device_by_mnemonic', 'load_device_by_xprv',
'recovery_device', 'reset_device', 'sign_message',
'sign_tx', 'wipe_device']:
setattr(cls, method, cls.wrapper(getattr(cls, method)))
def show_address(self, address_str, script_type, multisig=None):
coin_name = self.plugin.get_coin_name()
address_n = parse_path(address_str)
with self.run_flow():
return trezorlib.btc.get_address(
self.client,
coin_name,
address_n,
show_display=True,
script_type=script_type,
multisig=multisig)
def sign_message(self, address_str, message):
coin_name = self.plugin.get_coin_name()
address_n = parse_path(address_str)
with self.run_flow():
return trezorlib.btc.sign_message(
self.client,
coin_name,
address_n,
message)
def recover_device(self, recovery_type, *args, **kwargs):
input_callback = self.mnemonic_callback(recovery_type)
with self.run_flow():
return trezorlib.device.recover(
self.client,
*args,
input_callback=input_callback,
**kwargs)
# ========= Unmodified trezorlib methods =========
def sign_tx(self, *args, **kwargs):
with self.run_flow():
return trezorlib.btc.sign_tx(self.client, *args, **kwargs)
def reset_device(self, *args, **kwargs):
with self.run_flow():
return trezorlib.device.reset(self.client, *args, **kwargs)
def wipe_device(self, *args, **kwargs):
with self.run_flow():
return trezorlib.device.wipe(self.client, *args, **kwargs)
# ========= UI methods ==========
def button_request(self, code):
message = self.msg or MESSAGES.get(code) or MESSAGES['default']
self.handler.show_message(message.format(self.device), self.client.cancel)
def get_pin(self, code=None):
if code == 2:
msg = _("Enter a new PIN for your {}:")
elif code == 3:
msg = (_("Re-enter the new PIN for your {}.\n\n"
"NOTE: the positions of the numbers have changed!"))
else:
msg = _("Enter your current {} PIN:")
pin = self.handler.get_pin(msg.format(self.device))
if not pin:
raise Cancelled
if len(pin) > 9:
self.handler.show_error(_('The PIN cannot be longer than 9 characters.'))
raise Cancelled
return pin
def get_passphrase(self):
if self.creating_wallet:
msg = _("Enter a passphrase to generate this wallet. Each time "
"you use this wallet your {} will prompt you for the "
"passphrase. If you forget the passphrase you cannot "
"access the bitcoins in the wallet.").format(self.device)
else:
msg = _("Enter the passphrase to unlock this wallet:")
passphrase = self.handler.get_passphrase(msg, self.creating_wallet)
if passphrase is None:
raise Cancelled
passphrase = bip39_normalize_passphrase(passphrase)
length = len(passphrase)
if length > 50:
self.handler.show_error(_("Too long passphrase ({} > 50 chars).").format(length))
raise Cancelled
return passphrase
def _matrix_char(self, matrix_type):
num = 9 if matrix_type == WordRequestType.Matrix9 else 6
char = self.handler.get_matrix(num)
if char == 'x':
raise Cancelled
return char
def mnemonic_callback(self, recovery_type):
if recovery_type is None:
return None
if recovery_type == RecoveryDeviceType.Matrix:
return self._matrix_char
step = 0
def word_callback(_ignored):
nonlocal step
step += 1
msg = _("Step {}/24. Enter seed word as explained on your {}:").format(step, self.device)
word = self.handler.get_word(msg)
if not word:
raise Cancelled
return word
return word_callback

75
electrum/plugins/trezor/qt.py

@ -12,7 +12,7 @@ from electrum.util import bh2u
from ..hw_wallet.qt import QtHandlerBase, QtPluginBase
from ..hw_wallet.plugin import only_hook_if_libraries_available
from .trezor import (TrezorPlugin, TIM_NEW, TIM_RECOVER, TIM_MNEMONIC,
from .trezor import (TrezorPlugin, TIM_NEW, TIM_RECOVER,
RECOVERY_TYPE_SCRAMBLED_WORDS, RECOVERY_TYPE_MATRIX)
@ -197,50 +197,24 @@ class QtPlugin(QtPluginBase):
text = widget.toPlainText().strip()
return ' '.join(text.split())
if method in [TIM_NEW, TIM_RECOVER]:
gb = QGroupBox()
hbox1 = QHBoxLayout()
gb.setLayout(hbox1)
vbox.addWidget(gb)
gb.setTitle(_("Select your seed length:"))
bg_numwords = QButtonGroup()
for i, count in enumerate([12, 18, 24]):
rb = QRadioButton(gb)
rb.setText(_("%d words") % count)
bg_numwords.addButton(rb)
bg_numwords.setId(rb, i)
hbox1.addWidget(rb)
rb.setChecked(True)
cb_pin = QCheckBox(_('Enable PIN protection'))
cb_pin.setChecked(True)
else:
text = QTextEdit()
text.setMaximumHeight(60)
if method == TIM_MNEMONIC:
msg = _("Enter your BIP39 mnemonic:")
else:
msg = _("Enter the master private key beginning with xprv:")
def set_enabled():
from electrum.bip32 import is_xprv
wizard.next_button.setEnabled(is_xprv(clean_text(text)))
text.textChanged.connect(set_enabled)
next_enabled = False
vbox.addWidget(QLabel(msg))
vbox.addWidget(text)
pin = QLineEdit()
pin.setValidator(QRegExpValidator(QRegExp('[1-9]{0,9}')))
pin.setMaximumWidth(100)
hbox_pin = QHBoxLayout()
hbox_pin.addWidget(QLabel(_("Enter your PIN (digits 1-9):")))
hbox_pin.addWidget(pin)
hbox_pin.addStretch(1)
if method in [TIM_NEW, TIM_RECOVER]:
vbox.addWidget(WWLabel(RECOMMEND_PIN))
vbox.addWidget(cb_pin)
else:
vbox.addLayout(hbox_pin)
gb = QGroupBox()
hbox1 = QHBoxLayout()
gb.setLayout(hbox1)
vbox.addWidget(gb)
gb.setTitle(_("Select your seed length:"))
bg_numwords = QButtonGroup()
for i, count in enumerate([12, 18, 24]):
rb = QRadioButton(gb)
rb.setText(_("%d words") % count)
bg_numwords.addButton(rb)
bg_numwords.setId(rb, i)
hbox1.addWidget(rb)
rb.setChecked(True)
cb_pin = QCheckBox(_('Enable PIN protection'))
cb_pin.setChecked(True)
vbox.addWidget(WWLabel(RECOMMEND_PIN))
vbox.addWidget(cb_pin)
passphrase_msg = WWLabel(PASSPHRASE_HELP_SHORT)
passphrase_warning = WWLabel(PASSPHRASE_NOT_PIN)
@ -277,14 +251,9 @@ class QtPlugin(QtPluginBase):
wizard.exec_layout(vbox, next_enabled=next_enabled)
if method in [TIM_NEW, TIM_RECOVER]:
item = bg_numwords.checkedId()
pin = cb_pin.isChecked()
recovery_type = bg_rectype.checkedId() if bg_rectype else None
else:
item = ' '.join(str(clean_text(text)).split())
pin = str(pin.text())
recovery_type = None
item = bg_numwords.checkedId()
pin = cb_pin.isChecked()
recovery_type = bg_rectype.checkedId() if bg_rectype else None
return (item, name.text(), pin, cb_phrase.isChecked(), recovery_type)

95
electrum/plugins/trezor/transport.py

@ -1,95 +0,0 @@
from electrum.util import PrintError
class TrezorTransport(PrintError):
@staticmethod
def all_transports():
"""Reimplemented trezorlib.transport.all_transports so that we can
enable/disable specific transports.
"""
try:
# only to detect trezorlib version
from trezorlib.transport import all_transports
except ImportError:
# old trezorlib. compat for trezorlib < 0.9.2
transports = []
try:
from trezorlib.transport_bridge import BridgeTransport
transports.append(BridgeTransport)
except BaseException:
pass
try:
from trezorlib.transport_hid import HidTransport
transports.append(HidTransport)
except BaseException:
pass
try:
from trezorlib.transport_udp import UdpTransport
transports.append(UdpTransport)
except BaseException:
pass
try:
from trezorlib.transport_webusb import WebUsbTransport
transports.append(WebUsbTransport)
except BaseException:
pass
else:
# new trezorlib.
transports = []
try:
from trezorlib.transport.bridge import BridgeTransport
transports.append(BridgeTransport)
except BaseException:
pass
try:
from trezorlib.transport.hid import HidTransport
transports.append(HidTransport)
except BaseException:
pass
try:
from trezorlib.transport.udp import UdpTransport
transports.append(UdpTransport)
except BaseException:
pass
try:
from trezorlib.transport.webusb import WebUsbTransport
transports.append(WebUsbTransport)
except BaseException:
pass
return transports
return transports
def enumerate_devices(self):
"""Just like trezorlib.transport.enumerate_devices,
but with exception catching, so that transports can fail separately.
"""
devices = []
for transport in self.all_transports():
try:
new_devices = transport.enumerate()
except BaseException as e:
self.print_error('enumerate failed for {}. error {}'
.format(transport.__name__, str(e)))
else:
devices.extend(new_devices)
return devices
def get_transport(self, path=None):
"""Reimplemented trezorlib.transport.get_transport,
(1) for old trezorlib
(2) to be able to disable specific transports
(3) to call our own enumerate_devices that catches exceptions
"""
if path is None:
try:
return self.enumerate_devices()[0]
except IndexError:
raise Exception("No TREZOR device found") from None
def match_prefix(a, b):
return a.startswith(b) or b.startswith(a)
transports = [t for t in self.all_transports() if match_prefix(path, t.PATH_PREFIX)]
if transports:
return transports[0].find_by_path(path)
raise Exception("Unknown path prefix '%s'" % path)

320
electrum/plugins/trezor/trezor.py

@ -1,29 +1,51 @@
from binascii import hexlify, unhexlify
import traceback
import sys
from electrum.util import bfh, bh2u, versiontuple, UserCancelled, UserFacingException
from electrum.bitcoin import TYPE_ADDRESS, TYPE_SCRIPT
from electrum.bip32 import deserialize_xpub
from electrum.bip32 import deserialize_xpub, convert_bip32_path_to_list_of_uint32 as parse_path
from electrum import constants
from electrum.i18n import _
from electrum.plugin import Device
from electrum.transaction import deserialize, Transaction
from electrum.keystore import Hardware_KeyStore, is_xpubkey, parse_xpubkey
from electrum.base_wizard import ScriptTypeNotSupported
from electrum.base_wizard import ScriptTypeNotSupported, HWD_SETUP_NEW_WALLET
from ..hw_wallet import HW_PluginBase
from ..hw_wallet.plugin import is_any_tx_output_on_change_branch, trezor_validate_op_return_output_and_get_data
try:
import trezorlib
import trezorlib.transport
# TREZOR initialization methods
TIM_NEW, TIM_RECOVER, TIM_MNEMONIC, TIM_PRIVKEY = range(0, 4)
RECOVERY_TYPE_SCRAMBLED_WORDS, RECOVERY_TYPE_MATRIX = range(0, 2)
from .clientbase import TrezorClientBase
from trezorlib.messages import (
RecoveryDeviceType, HDNodeType, HDNodePathType,
InputScriptType, OutputScriptType, MultisigRedeemScriptType,
TxInputType, TxOutputType, TxOutputBinType, TransactionType, SignTx)
RECOVERY_TYPE_SCRAMBLED_WORDS = RecoveryDeviceType.ScrambledWords
RECOVERY_TYPE_MATRIX = RecoveryDeviceType.Matrix
TREZORLIB = True
except Exception as e:
import traceback
traceback.print_exc()
TREZORLIB = False
RECOVERY_TYPE_SCRAMBLED_WORDS, RECOVERY_TYPE_MATRIX = range(2)
# Trezor initialization methods
TIM_NEW, TIM_RECOVER = range(2)
TREZOR_PRODUCT_KEY = 'Trezor'
class TrezorKeyStore(Hardware_KeyStore):
hw_type = 'trezor'
device = 'TREZOR'
device = TREZOR_PRODUCT_KEY
def get_derivation(self):
return self.derivation
@ -37,8 +59,7 @@ class TrezorKeyStore(Hardware_KeyStore):
def sign_message(self, sequence, message, password):
client = self.get_client()
address_path = self.get_derivation() + "/%d/%d"%sequence
address_n = client.expand_path(address_path)
msg_sig = client.sign_message(self.plugin.get_coin_name(), address_n, message)
msg_sig = client.sign_message(address_path, message)
return msg_sig.signature
def sign_transaction(self, tx, password):
@ -75,41 +96,35 @@ class TrezorPlugin(HW_PluginBase):
libraries_URL = 'https://github.com/trezor/python-trezor'
minimum_firmware = (1, 5, 2)
keystore_class = TrezorKeyStore
minimum_library = (0, 9, 0)
minimum_library = (0, 11, 0)
maximum_library = (0, 12)
SUPPORTED_XTYPES = ('standard', 'p2wpkh-p2sh', 'p2wpkh', 'p2wsh-p2sh', 'p2wsh')
DEVICE_IDS = (TREZOR_PRODUCT_KEY,)
MAX_LABEL_LEN = 32
def __init__(self, parent, config, name):
HW_PluginBase.__init__(self, parent, config, name)
super().__init__(parent, config, name)
self.libraries_available = self.check_libraries_available()
if not self.libraries_available:
return
from . import client
from . import transport
import trezorlib.messages
self.client_class = client.TrezorClient
self.types = trezorlib.messages
self.DEVICE_IDS = ('TREZOR',)
self.transport_handler = transport.TrezorTransport()
self.device_manager().register_enumerate_func(self.enumerate)
def get_library_version(self):
import trezorlib
if not TREZORLIB:
raise ImportError
try:
return trezorlib.__version__
except AttributeError:
except Exception:
return 'unknown'
def enumerate(self):
devices = self.transport_handler.enumerate_devices()
devices = trezorlib.transport.enumerate_devices()
return [Device(path=d.get_path(),
interface_number=-1,
id_=d.get_path(),
product_key='TREZOR',
product_key=TREZOR_PRODUCT_KEY,
usage_page=0,
transport_ui_string=d.get_path())
for d in devices]
@ -117,7 +132,7 @@ class TrezorPlugin(HW_PluginBase):
def create_client(self, device, handler):
try:
self.print_error("connecting to device at", device.path)
transport = self.transport_handler.get_transport(device.path)
transport = trezorlib.transport.get_transport(device.path)
except BaseException as e:
self.print_error("cannot connect at", device.path, str(e))
return None
@ -128,27 +143,7 @@ class TrezorPlugin(HW_PluginBase):
self.print_error("connected to device at", device.path)
# note that this call can still raise!
client = self.client_class(transport, handler, self)
# Try a ping for device sanity
try:
client.ping('t')
except BaseException as e:
self.print_error("ping failed", str(e))
return None
if not client.atleast_version(*self.minimum_firmware):
msg = (_('Outdated {} firmware for device labelled {}. Please '
'download the updated firmware from {}')
.format(self.device, client.label(), self.firmware_URL))
self.print_error(msg)
if handler:
handler.show_error(msg)
else:
raise UserFacingException(msg)
return None
return client
return TrezorClientBase(transport, handler, self)
def get_client(self, keystore, force_pair=True):
devmgr = self.device_manager()
@ -177,8 +172,6 @@ class TrezorPlugin(HW_PluginBase):
# Must be short as QT doesn't word-wrap radio button text
(TIM_NEW, _("Let the device generate a completely new seed randomly")),
(TIM_RECOVER, _("Recover from a seed you have previously written down")),
(TIM_MNEMONIC, _("Upload a BIP39 mnemonic to generate the seed")),
(TIM_PRIVKEY, _("Upload a master private key"))
]
devmgr = self.device_manager()
client = devmgr.client_by_id(device_id)
@ -222,49 +215,37 @@ class TrezorPlugin(HW_PluginBase):
"the words carefully!"),
blocking=True)
language = 'english'
devmgr = self.device_manager()
client = devmgr.client_by_id(device_id)
if method == TIM_NEW:
strength = 64 * (item + 2) # 128, 192 or 256
u2f_counter = 0
skip_backup = False
client.reset_device(True, strength, passphrase_protection,
pin_protection, label, language,
u2f_counter, skip_backup)
client.reset_device(
strength=64 * (item + 2), # 128, 192 or 256
passphrase_protection=passphrase_protection,
pin_protection=pin_protection,
label=label)
elif method == TIM_RECOVER:
word_count = 6 * (item + 2) # 12, 18 or 24
client.step = 0
if recovery_type == RECOVERY_TYPE_SCRAMBLED_WORDS:
recovery_type_trezor = self.types.RecoveryDeviceType.ScrambledWords
else:
recovery_type_trezor = self.types.RecoveryDeviceType.Matrix
client.recovery_device(word_count, passphrase_protection,
pin_protection, label, language,
type=recovery_type_trezor)
client.recover_device(
recovery_type=recovery_type,
word_count=6 * (item + 2), # 12, 18 or 24
passphrase_protection=passphrase_protection,
pin_protection=pin_protection,
label=label)
if recovery_type == RECOVERY_TYPE_MATRIX:
handler.close_matrix_dialog()
elif method == TIM_MNEMONIC:
pin = pin_protection # It's the pin, not a boolean
client.load_device_by_mnemonic(str(item), pin,
passphrase_protection,
label, language)
else:
pin = pin_protection # It's the pin, not a boolean
client.load_device_by_xprv(item, pin, passphrase_protection,
label, language)
raise RuntimeError("Unsupported recovery method")
def _make_node_path(self, xpub, address_n):
_, depth, fingerprint, child_num, chain_code, key = deserialize_xpub(xpub)
node = self.types.HDNodeType(
node = HDNodeType(
depth=depth,
fingerprint=int.from_bytes(fingerprint, 'big'),
child_num=int.from_bytes(child_num, 'big'),
chain_code=chain_code,
public_key=key,
)
return self.types.HDNodePathType(node=node, address_n=address_n)
return HDNodePathType(node=node, address_n=address_n)
def setup_device(self, device_info, wizard, purpose):
devmgr = self.device_manager()
@ -273,11 +254,19 @@ class TrezorPlugin(HW_PluginBase):
if client is None:
raise UserFacingException(_('Failed to create a client for this device.') + '\n' +
_('Make sure it is in the correct state.'))
if not client.is_uptodate():
msg = (_('Outdated {} firmware for device labelled {}. Please '
'download the updated firmware from {}')
.format(self.device, client.label(), self.firmware_URL))
raise UserFacingException(msg)
# fixme: we should use: client.handler = wizard
client.handler = self.create_handler(wizard)
if not device_info.initialized:
self.initialize_device(device_id, wizard, client.handler)
client.get_xpub('m', 'standard')
is_creating_wallet = purpose == HWD_SETUP_NEW_WALLET
client.get_xpub('m', 'standard', creating=is_creating_wallet)
client.used()
def get_xpub(self, device_id, derivation, xtype, wizard):
@ -292,33 +281,33 @@ class TrezorPlugin(HW_PluginBase):
def get_trezor_input_script_type(self, electrum_txin_type: str):
if electrum_txin_type in ('p2wpkh', 'p2wsh'):
return self.types.InputScriptType.SPENDWITNESS
return InputScriptType.SPENDWITNESS
if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
return self.types.InputScriptType.SPENDP2SHWITNESS
return InputScriptType.SPENDP2SHWITNESS
if electrum_txin_type in ('p2pkh', ):
return self.types.InputScriptType.SPENDADDRESS
return InputScriptType.SPENDADDRESS
if electrum_txin_type in ('p2sh', ):
return self.types.InputScriptType.SPENDMULTISIG
return InputScriptType.SPENDMULTISIG
raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
def get_trezor_output_script_type(self, electrum_txin_type: str):
if electrum_txin_type in ('p2wpkh', 'p2wsh'):
return self.types.OutputScriptType.PAYTOWITNESS
return OutputScriptType.PAYTOWITNESS
if electrum_txin_type in ('p2wpkh-p2sh', 'p2wsh-p2sh'):
return self.types.OutputScriptType.PAYTOP2SHWITNESS
return OutputScriptType.PAYTOP2SHWITNESS
if electrum_txin_type in ('p2pkh', ):
return self.types.OutputScriptType.PAYTOADDRESS
return OutputScriptType.PAYTOADDRESS
if electrum_txin_type in ('p2sh', ):
return self.types.OutputScriptType.PAYTOMULTISIG
return OutputScriptType.PAYTOMULTISIG
raise ValueError('unexpected txin type: {}'.format(electrum_txin_type))
def sign_transaction(self, keystore, tx, prev_tx, xpub_path):
self.prev_tx = prev_tx
self.xpub_path = xpub_path
prev_tx = { bfh(txhash): self.electrum_tx_to_txtype(tx, xpub_path) for txhash, tx in prev_tx.items() }
client = self.get_client(keystore)
inputs = self.tx_inputs(tx, True)
inputs = self.tx_inputs(tx, xpub_path, True)
outputs = self.tx_outputs(keystore.get_derivation(), tx)
signatures = client.sign_tx(self.get_coin_name(), inputs, outputs, lock_time=tx.locktime)[0]
details = SignTx(lock_time=tx.locktime)
signatures, _ = client.sign_tx(self.get_coin_name(), inputs, outputs, details=details, prev_txes=prev_tx)
signatures = [(bh2u(x) + '01') for x in signatures]
tx.update_signatures(signatures)
@ -327,74 +316,50 @@ class TrezorPlugin(HW_PluginBase):
keystore = wallet.get_keystore()
if not self.show_address_helper(wallet, address, keystore):
return
client = self.get_client(keystore)
if not client.atleast_version(1, 3):
keystore.handler.show_error(_("Your device firmware is too old"))
return
change, index = wallet.get_address_index(address)
deriv_suffix = wallet.get_address_index(address)
derivation = keystore.derivation
address_path = "%s/%d/%d"%(derivation, change, index)
address_n = client.expand_path(address_path)
address_path = "%s/%d/%d"%(derivation, *deriv_suffix)
script_type = self.get_trezor_input_script_type(wallet.txin_type)
# prepare multisig, if available:
xpubs = wallet.get_master_public_keys()
if len(xpubs) == 1:
script_type = self.get_trezor_input_script_type(wallet.txin_type)
client.get_address(self.get_coin_name(), address_n, True, script_type=script_type)
else:
def f(xpub):
return self._make_node_path(xpub, [change, index])
if len(xpubs) > 1:
pubkeys = wallet.get_public_keys(address)
# sort xpubs using the order of pubkeys
sorted_pubkeys, sorted_xpubs = zip(*sorted(zip(pubkeys, xpubs)))
pubkeys = list(map(f, sorted_xpubs))
multisig = self.types.MultisigRedeemScriptType(
pubkeys=pubkeys,
signatures=[b''] * wallet.n,
m=wallet.m,
)
script_type = self.get_trezor_input_script_type(wallet.txin_type)
client.get_address(self.get_coin_name(), address_n, True, multisig=multisig, script_type=script_type)
def tx_inputs(self, tx, for_sig=False):
sorted_pairs = sorted(zip(pubkeys, xpubs))
multisig = self._make_multisig(
wallet.m,
[(xpub, deriv_suffix) for _, xpub in sorted_pairs])
else:
multisig = None
client = self.get_client(keystore)
client.show_address(address_path, script_type, multisig)
def tx_inputs(self, tx, xpub_path, for_sig=False):
inputs = []
for txin in tx.inputs():
txinputtype = self.types.TxInputType()
txinputtype = TxInputType()
if txin['type'] == 'coinbase':
prev_hash = b"\x00"*32
prev_index = 0xffffffff # signed int -1
else:
if for_sig:
x_pubkeys = txin['x_pubkeys']
if len(x_pubkeys) == 1:
x_pubkey = x_pubkeys[0]
xpub, s = parse_xpubkey(x_pubkey)
xpub_n = self.client_class.expand_path(self.xpub_path[xpub])
txinputtype._extend_address_n(xpub_n + s)
txinputtype.script_type = self.get_trezor_input_script_type(txin['type'])
else:
def f(x_pubkey):
xpub, s = parse_xpubkey(x_pubkey)
return self._make_node_path(xpub, s)
pubkeys = list(map(f, x_pubkeys))
multisig = self.types.MultisigRedeemScriptType(
pubkeys=pubkeys,
signatures=list(map(lambda x: bfh(x)[:-1] if x else b'', txin.get('signatures'))),
m=txin.get('num_sig'),
)
script_type = self.get_trezor_input_script_type(txin['type'])
txinputtype = self.types.TxInputType(
script_type=script_type,
multisig=multisig
)
# find which key is mine
for x_pubkey in x_pubkeys:
if is_xpubkey(x_pubkey):
xpub, s = parse_xpubkey(x_pubkey)
if xpub in self.xpub_path:
xpub_n = self.client_class.expand_path(self.xpub_path[xpub])
txinputtype._extend_address_n(xpub_n + s)
break
prev_hash = unhexlify(txin['prevout_hash'])
xpubs = [parse_xpubkey(x) for x in x_pubkeys]
multisig = self._make_multisig(txin.get('num_sig'), xpubs, txin.get('signatures'))
script_type = self.get_trezor_input_script_type(txin['type'])
txinputtype = TxInputType(
script_type=script_type,
multisig=multisig)
# find which key is mine
for xpub, deriv in xpubs:
if xpub in xpub_path:
xpub_n = parse_path(xpub_path[xpub])
txinputtype.address_n = xpub_n + deriv
break
prev_hash = bfh(txin['prevout_hash'])
prev_index = txin['prevout_n']
if 'value' in txin:
@ -412,39 +377,44 @@ class TrezorPlugin(HW_PluginBase):
return inputs
def _make_multisig(self, m, xpubs, signatures=None):
if len(xpubs) == 1:
return None
pubkeys = [self._make_node_path(xpub, deriv) for xpub, deriv in xpubs]
if signatures is None:
signatures = [b''] * len(pubkeys)
elif len(signatures) != len(pubkeys):
raise RuntimeError('Mismatched number of signatures')
else:
signatures = [bfh(x)[:-1] if x else b'' for x in signatures]
return MultisigRedeemScriptType(
pubkeys=pubkeys,
signatures=signatures,
m=m)
def tx_outputs(self, derivation, tx):
def create_output_by_derivation():
script_type = self.get_trezor_output_script_type(info.script_type)
if len(xpubs) == 1:
address_n = self.client_class.expand_path(derivation + "/%d/%d" % index)
txoutputtype = self.types.TxOutputType(
amount=amount,
script_type=script_type,
address_n=address_n,
)
else:
address_n = self.client_class.expand_path("/%d/%d" % index)
pubkeys = [self._make_node_path(xpub, address_n) for xpub in xpubs]
multisig = self.types.MultisigRedeemScriptType(
pubkeys=pubkeys,
signatures=[b''] * len(pubkeys),
m=m)
txoutputtype = self.types.TxOutputType(
multisig=multisig,
amount=amount,
address_n=self.client_class.expand_path(derivation + "/%d/%d" % index),
script_type=script_type)
deriv = parse_path("/%d/%d" % index)
multisig = self._make_multisig(m, [(xpub, deriv) for xpub in xpubs])
txoutputtype = TxOutputType(
multisig=multisig,
amount=amount,
address_n=parse_path(derivation + "/%d/%d" % index),
script_type=script_type)
return txoutputtype
def create_output_by_address():
txoutputtype = self.types.TxOutputType()
txoutputtype = TxOutputType()
txoutputtype.amount = amount
if _type == TYPE_SCRIPT:
txoutputtype.script_type = self.types.OutputScriptType.PAYTOOPRETURN
txoutputtype.script_type = OutputScriptType.PAYTOOPRETURN
txoutputtype.op_return_data = trezor_validate_op_return_output_and_get_data(o)
elif _type == TYPE_ADDRESS:
txoutputtype.script_type = self.types.OutputScriptType.PAYTOADDRESS
txoutputtype.script_type = OutputScriptType.PAYTOADDRESS
txoutputtype.address = address
return txoutputtype
@ -476,23 +446,17 @@ class TrezorPlugin(HW_PluginBase):
return outputs
def electrum_tx_to_txtype(self, tx):
t = self.types.TransactionType()
def electrum_tx_to_txtype(self, tx, xpub_path):
t = TransactionType()
if tx is None:
# probably for segwit input and we don't need this prev txn
return t
d = deserialize(tx.raw)
t.version = d['version']
t.lock_time = d['lockTime']
inputs = self.tx_inputs(tx)
t._extend_inputs(inputs)
for vout in d['outputs']:
o = t._add_bin_outputs()
o.amount = vout['value']
o.script_pubkey = bfh(vout['scriptPubKey'])
t.inputs = self.tx_inputs(tx, xpub_path)
t.bin_outputs = [
TxOutputBinType(amount=vout['value'], script_pubkey=bfh(vout['scriptPubKey']))
for vout in d['outputs']
]
return t
# This function is called from the TREZOR libraries (via tx_api)
def get_tx(self, tx_hash):
tx = self.prev_tx[tx_hash]
return self.electrum_tx_to_txtype(tx)

Loading…
Cancel
Save