You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

650 lines
29 KiB

# -*- coding: utf-8 -*-
#
# Electrum - lightweight Bitcoin client
# Copyright (C) 2016 Thomas Voegtlin
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
import sys
import copy
import traceback
from functools import partial
from typing import List, TYPE_CHECKING, Tuple, NamedTuple, Any, Dict, Optional
from . import bitcoin
from . import keystore
from . import mnemonic
from .bip32 import is_bip32_derivation, xpub_type, normalize_bip32_derivation
from .keystore import bip44_derivation, purpose48_derivation
from .wallet import (Imported_Wallet, Standard_Wallet, Multisig_Wallet,
wallet_types, Wallet, Abstract_Wallet)
from .storage import (WalletStorage, STO_EV_USER_PW, STO_EV_XPUB_PW,
get_derivation_used_for_hw_device_encryption)
from .i18n import _
from .util import UserCancelled, InvalidPassword, WalletFileException
from .simple_config import SimpleConfig
from .plugin import Plugins, HardwarePluginLibraryUnavailable
from .logging import Logger
from .plugins.hw_wallet.plugin import OutdatedHwFirmwareException, HW_PluginBase
if TYPE_CHECKING:
from .plugin import DeviceInfo
# hardware device setup purpose
HWD_SETUP_NEW_WALLET, HWD_SETUP_DECRYPT_WALLET = range(0, 2)
class ScriptTypeNotSupported(Exception): pass
class GoBack(Exception): pass
class WizardStackItem(NamedTuple):
action: Any
args: Any
kwargs: Dict[str, Any]
storage_data: dict
class BaseWizard(Logger):
def __init__(self, config: SimpleConfig, plugins: Plugins):
super(BaseWizard, self).__init__()
Logger.__init__(self)
self.config = config
self.plugins = plugins
self.data = {}
self.pw_args = None
self._stack = [] # type: List[WizardStackItem]
self.plugin = None
self.keystores = []
self.is_kivy = config.get('gui') == 'kivy'
self.seed_type = None
def set_icon(self, icon):
pass
def run(self, *args, **kwargs):
action = args[0]
args = args[1:]
storage_data = copy.deepcopy(self.data)
self._stack.append(WizardStackItem(action, args, kwargs, storage_data))
if not action:
return
if type(action) is tuple:
self.plugin, action = action
if self.plugin and hasattr(self.plugin, action):
f = getattr(self.plugin, action)
f(self, *args, **kwargs)
elif hasattr(self, action):
f = getattr(self, action)
f(*args, **kwargs)
else:
raise Exception("unknown action", action)
def can_go_back(self):
return len(self._stack) > 1
def go_back(self):
if not self.can_go_back():
return
# pop 'current' frame
self._stack.pop()
# pop 'previous' frame
stack_item = self._stack.pop()
# try to undo side effects since we last entered 'previous' frame
# FIXME only self.storage is properly restored
self.data = copy.deepcopy(stack_item.storage_data)
# rerun 'previous' frame
self.run(stack_item.action, *stack_item.args, **stack_item.kwargs)
def reset_stack(self):
self._stack = []
def new(self):
title = _("Create new wallet")
message = '\n'.join([
_("What kind of wallet do you want to create?")
])
wallet_kinds = [
('standard', _("Standard wallet")),
('2fa', _("Wallet with two-factor authentication")),
('multisig', _("Multi-signature wallet")),
('imported', _("Import Bitcoin addresses or private keys")),
]
choices = [pair for pair in wallet_kinds if pair[0] in wallet_types]
self.choice_dialog(title=title, message=message, choices=choices, run_next=self.on_wallet_type)
def upgrade_storage(self, storage):
exc = None
def on_finished():
if exc is None:
self.terminate(storage=storage)
else:
raise exc
def do_upgrade():
nonlocal exc
try:
storage.upgrade()
except Exception as e:
exc = e
self.waiting_dialog(do_upgrade, _('Upgrading wallet format...'), on_finished=on_finished)
def load_2fa(self):
self.data['wallet_type'] = '2fa'
self.data['use_trustedcoin'] = True
self.plugin = self.plugins.load_plugin('trustedcoin')
def on_wallet_type(self, choice):
self.data['wallet_type'] = self.wallet_type = choice
if choice == 'standard':
action = 'choose_keystore'
elif choice == 'multisig':
action = 'choose_multisig'
elif choice == '2fa':
self.load_2fa()
action = self.plugin.get_action(self.data)
elif choice == 'imported':
action = 'import_addresses_or_keys'
self.run(action)
def choose_multisig(self):
def on_multisig(m, n):
multisig_type = "%dof%d" % (m, n)
self.data['wallet_type'] = multisig_type
self.n = n
self.run('choose_keystore')
self.multisig_dialog(run_next=on_multisig)
def choose_keystore(self):
assert self.wallet_type in ['standard', 'multisig']
i = len(self.keystores)
title = _('Add cosigner') + ' (%d of %d)'%(i+1, self.n) if self.wallet_type=='multisig' else _('Keystore')
if self.wallet_type =='standard' or i==0:
message = _('Do you want to create a new seed, or to restore a wallet using an existing seed?')
choices = [
('choose_seed_type', _('Create a new seed')),
('restore_from_seed', _('I already have a seed')),
('restore_from_key', _('Use a master key')),
]
if not self.is_kivy:
choices.append(('choose_hw_device', _('Use a hardware device')))
else:
message = _('Add a cosigner to your multi-sig wallet')
choices = [
('restore_from_key', _('Enter cosigner key')),
('restore_from_seed', _('Enter cosigner seed')),
]
if not self.is_kivy:
choices.append(('choose_hw_device', _('Cosign with hardware device')))
self.choice_dialog(title=title, message=message, choices=choices, run_next=self.run)
def import_addresses_or_keys(self):
v = lambda x: keystore.is_address_list(x) or keystore.is_private_key_list(x, raise_on_error=True)
title = _("Import Bitcoin Addresses")
message = _("Enter a list of Bitcoin addresses (this will create a watching-only wallet), or a list of private keys.")
self.add_xpub_dialog(title=title, message=message, run_next=self.on_import,
is_valid=v, allow_multi=True, show_wif_help=True)
def on_import(self, text):
# text is already sanitized by is_address_list and is_private_keys_list
if keystore.is_address_list(text):
self.data['addresses'] = {}
for addr in text.split():
assert bitcoin.is_address(addr)
self.data['addresses'][addr] = {}
elif keystore.is_private_key_list(text):
self.data['addresses'] = {}
k = keystore.Imported_KeyStore({})
keys = keystore.get_private_keys(text)
for pk in keys:
assert bitcoin.is_private_key(pk)
txin_type, pubkey = k.import_privkey(pk, None)
addr = bitcoin.pubkey_to_address(txin_type, pubkey)
self.data['addresses'][addr] = {'type':txin_type, 'pubkey':pubkey, 'redeem_script':None}
self.keystores.append(k)
else:
return self.terminate()
return self.run('create_wallet')
def restore_from_key(self):
if self.wallet_type == 'standard':
v = keystore.is_master_key
title = _("Create keystore from a master key")
message = ' '.join([
_("To create a watching-only wallet, please enter your master public key (xpub/ypub/zpub)."),
_("To create a spending wallet, please enter a master private key (xprv/yprv/zprv).")
])
self.add_xpub_dialog(title=title, message=message, run_next=self.on_restore_from_key, is_valid=v)
else:
i = len(self.keystores) + 1
self.add_cosigner_dialog(index=i, run_next=self.on_restore_from_key, is_valid=keystore.is_bip32_key)
def on_restore_from_key(self, text):
k = keystore.from_master_key(text)
self.on_keystore(k)
def choose_hw_device(self, purpose=HWD_SETUP_NEW_WALLET, *, storage=None):
title = _('Hardware Keystore')
# check available plugins
supported_plugins = self.plugins.get_hardware_support()
devices = [] # type: List[Tuple[str, DeviceInfo]]
devmgr = self.plugins.device_manager
debug_msg = ''
def failed_getting_device_infos(name, e):
nonlocal debug_msg
err_str_oneline = ' // '.join(str(e).splitlines())
self.logger.warning(f'error getting device infos for {name}: {err_str_oneline}')
indented_error_msg = ' '.join([''] + str(e).splitlines(keepends=True))
debug_msg += f' {name}: (error getting device infos)\n{indented_error_msg}\n'
# scan devices
try:
scanned_devices = devmgr.scan_devices()
except BaseException as e:
self.logger.info('error scanning devices: {}'.format(repr(e)))
debug_msg = ' {}:\n {}'.format(_('Error scanning devices'), e)
else:
for splugin in supported_plugins:
name, plugin = splugin.name, splugin.plugin
# plugin init errored?
if not plugin:
e = splugin.exception
indented_error_msg = ' '.join([''] + str(e).splitlines(keepends=True))
debug_msg += f' {name}: (error during plugin init)\n'
debug_msg += ' {}\n'.format(_('You might have an incompatible library.'))
debug_msg += f'{indented_error_msg}\n'
continue
# see if plugin recognizes 'scanned_devices'
try:
# FIXME: side-effect: unpaired_device_info sets client.handler
device_infos = devmgr.unpaired_device_infos(None, plugin, devices=scanned_devices,
include_failing_clients=True)
except HardwarePluginLibraryUnavailable as e:
failed_getting_device_infos(name, e)
continue
except BaseException as e:
self.logger.exception('')
failed_getting_device_infos(name, e)
continue
device_infos_failing = list(filter(lambda di: di.exception is not None, device_infos))
for di in device_infos_failing:
failed_getting_device_infos(name, di.exception)
device_infos_working = list(filter(lambda di: di.exception is None, device_infos))
devices += list(map(lambda x: (name, x), device_infos_working))
if not debug_msg:
debug_msg = ' {}'.format(_('No exceptions encountered.'))
if not devices:
msg = (_('No hardware device detected.') + '\n' +
_('To trigger a rescan, press \'Next\'.') + '\n\n')
if sys.platform == 'win32':
msg += _('If your device is not detected on Windows, go to "Settings", "Devices", "Connected devices", '
'and do "Remove device". Then, plug your device again.') + '\n'
msg += _('While this is less than ideal, it might help if you run Electrum as Administrator.') + '\n'
else:
msg += _('On Linux, you might have to add a new permission to your udev rules.') + '\n'
msg += '\n\n'
msg += _('Debug message') + '\n' + debug_msg
self.confirm_dialog(title=title, message=msg,
run_next=lambda x: self.choose_hw_device(purpose, storage=storage))
return
# select device
self.devices = devices
choices = []
for name, info in devices:
state = _("initialized") if info.initialized else _("wiped")
label = info.label or _("An unnamed {}").format(name)
try: transport_str = info.device.transport_ui_string[:20]
except: transport_str = 'unknown transport'
descr = f"{label} [{name}, {state}, {transport_str}]"
choices.append(((name, info), descr))
msg = _('Select a device') + ':'
self.choice_dialog(title=title, message=msg, choices=choices,
run_next=lambda *args: self.on_device(*args, purpose=purpose, storage=storage))
def on_device(self, name, device_info, *, purpose, storage=None):
self.plugin = self.plugins.get_plugin(name) # type: HW_PluginBase
try:
self.plugin.setup_device(device_info, self, purpose)
except OSError as e:
self.show_error(_('We encountered an error while connecting to your device:')
+ '\n' + str(e) + '\n'
+ _('To try to fix this, we will now re-pair with your device.') + '\n'
+ _('Please try again.'))
devmgr = self.plugins.device_manager
devmgr.unpair_id(device_info.device.id_)
self.choose_hw_device(purpose, storage=storage)
return
except OutdatedHwFirmwareException as e:
if self.question(e.text_ignore_old_fw_and_continue(), title=_("Outdated device firmware")):
self.plugin.set_ignore_outdated_fw()
# will need to re-pair
devmgr = self.plugins.device_manager
devmgr.unpair_id(device_info.device.id_)
self.choose_hw_device(purpose, storage=storage)
return
except (UserCancelled, GoBack):
self.choose_hw_device(purpose, storage=storage)
return
except BaseException as e:
self.logger.exception('')
self.show_error(str(e))
self.choose_hw_device(purpose, storage=storage)
return
if purpose == HWD_SETUP_NEW_WALLET:
def f(derivation, script_type):
derivation = normalize_bip32_derivation(derivation)
self.run('on_hw_derivation', name, device_info, derivation, script_type)
self.derivation_and_script_type_dialog(f)
elif purpose == HWD_SETUP_DECRYPT_WALLET:
derivation = get_derivation_used_for_hw_device_encryption()
xpub = self.plugin.get_xpub(device_info.device.id_, derivation, 'standard', self)
password = keystore.Xpub.get_pubkey_from_xpub(xpub, ())
try:
storage.decrypt(password)
except InvalidPassword:
# try to clear session so that user can type another passphrase
devmgr = self.plugins.device_manager
client = devmgr.client_by_id(device_info.device.id_)
if hasattr(client, 'clear_session'): # FIXME not all hw wallet plugins have this
client.clear_session()
raise
else:
raise Exception('unknown purpose: %s' % purpose)
def derivation_and_script_type_dialog(self, f):
message1 = _('Choose the type of addresses in your wallet.')
message2 = ' '.join([
_('You can override the suggested derivation path.'),
_('If you are not sure what this is, leave this field unchanged.')
])
if self.wallet_type == 'multisig':
# There is no general standard for HD multisig.
# For legacy, this is partially compatible with BIP45; assumes index=0
# For segwit, a custom path is used, as there is no standard at all.
default_choice_idx = 2
choices = [
('standard', 'legacy multisig (p2sh)', "m/45'/0"),
('p2wsh-p2sh', 'p2sh-segwit multisig (p2wsh-p2sh)', purpose48_derivation(0, xtype='p2wsh-p2sh')),
('p2wsh', 'native segwit multisig (p2wsh)', purpose48_derivation(0, xtype='p2wsh')),
]
else:
default_choice_idx = 2
choices = [
('standard', 'legacy (p2pkh)', bip44_derivation(0, bip43_purpose=44)),
('p2wpkh-p2sh', 'p2sh-segwit (p2wpkh-p2sh)', bip44_derivation(0, bip43_purpose=49)),
('p2wpkh', 'native segwit (p2wpkh)', bip44_derivation(0, bip43_purpose=84)),
]
while True:
try:
self.choice_and_line_dialog(
run_next=f, title=_('Script type and Derivation path'), message1=message1,
message2=message2, choices=choices, test_text=is_bip32_derivation,
default_choice_idx=default_choice_idx)
return
except ScriptTypeNotSupported as e:
self.show_error(e)
# let the user choose again
def on_hw_derivation(self, name, device_info, derivation, xtype):
from .keystore import hardware_keystore
try:
xpub = self.plugin.get_xpub(device_info.device.id_, derivation, xtype, self)
except ScriptTypeNotSupported:
raise # this is handled in derivation_dialog
except BaseException as e:
self.logger.exception('')
self.show_error(e)
return
d = {
'type': 'hardware',
'hw_type': name,
'derivation': derivation,
'xpub': xpub,
'label': device_info.label,
}
k = hardware_keystore(d)
self.on_keystore(k)
def passphrase_dialog(self, run_next, is_restoring=False):
title = _('Seed extension')
message = '\n'.join([
_('You may extend your seed with custom words.'),
_('Your seed extension must be saved together with your seed.'),
])
warning = '\n'.join([
_('Note that this is NOT your encryption password.'),
_('If you do not know what this is, leave this field empty.'),
])
warn_issue4566 = is_restoring and self.seed_type == 'bip39'
self.line_dialog(title=title, message=message, warning=warning,
default='', test=lambda x:True, run_next=run_next,
warn_issue4566=warn_issue4566)
def restore_from_seed(self):
self.opt_bip39 = True
self.opt_ext = True
is_cosigning_seed = lambda x: mnemonic.seed_type(x) in ['standard', 'segwit']
test = mnemonic.is_seed if self.wallet_type == 'standard' else is_cosigning_seed
self.restore_seed_dialog(run_next=self.on_restore_seed, test=test)
def on_restore_seed(self, seed, is_bip39, is_ext):
self.seed_type = 'bip39' if is_bip39 else mnemonic.seed_type(seed)
if self.seed_type == 'bip39':
f = lambda passphrase: self.on_restore_bip39(seed, passphrase)
self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
elif self.seed_type in ['standard', 'segwit']:
f = lambda passphrase: self.run('create_keystore', seed, passphrase)
self.passphrase_dialog(run_next=f, is_restoring=True) if is_ext else f('')
elif self.seed_type == 'old':
self.run('create_keystore', seed, '')
elif mnemonic.is_any_2fa_seed_type(self.seed_type):
self.load_2fa()
self.run('on_restore_seed', seed, is_ext)
else:
raise Exception('Unknown seed type', self.seed_type)
def on_restore_bip39(self, seed, passphrase):
def f(derivation, script_type):
derivation = normalize_bip32_derivation(derivation)
self.run('on_bip43', seed, passphrase, derivation, script_type)
self.derivation_and_script_type_dialog(f)
def create_keystore(self, seed, passphrase):
k = keystore.from_seed(seed, passphrase, self.wallet_type == 'multisig')
self.on_keystore(k)
def on_bip43(self, seed, passphrase, derivation, script_type):
k = keystore.from_bip39_seed(seed, passphrase, derivation, xtype=script_type)
self.on_keystore(k)
def on_keystore(self, k):
has_xpub = isinstance(k, keystore.Xpub)
if has_xpub:
t1 = xpub_type(k.xpub)
if self.wallet_type == 'standard':
if has_xpub and t1 not in ['standard', 'p2wpkh', 'p2wpkh-p2sh']:
self.show_error(_('Wrong key type') + ' %s'%t1)
self.run('choose_keystore')
return
self.keystores.append(k)
self.run('create_wallet')
elif self.wallet_type == 'multisig':
assert has_xpub
if t1 not in ['standard', 'p2wsh', 'p2wsh-p2sh']:
self.show_error(_('Wrong key type') + ' %s'%t1)
self.run('choose_keystore')
return
if k.xpub in map(lambda x: x.xpub, self.keystores):
self.show_error(_('Error: duplicate master public key'))
self.run('choose_keystore')
return
if len(self.keystores)>0:
t2 = xpub_type(self.keystores[0].xpub)
if t1 != t2:
self.show_error(_('Cannot add this cosigner:') + '\n' + "Their key type is '%s', we are '%s'"%(t1, t2))
self.run('choose_keystore')
return
self.keystores.append(k)
if len(self.keystores) == 1:
xpub = k.get_master_public_key()
self.reset_stack()
self.run('show_xpub_and_add_cosigners', xpub)
elif len(self.keystores) < self.n:
self.run('choose_keystore')
else:
self.run('create_wallet')
def create_wallet(self):
encrypt_keystore = any(k.may_have_password() for k in self.keystores)
# note: the following condition ("if") is duplicated logic from
# wallet.get_available_storage_encryption_version()
if self.wallet_type == 'standard' and isinstance(self.keystores[0], keystore.Hardware_KeyStore):
# offer encrypting with a pw derived from the hw device
k = self.keystores[0]
try:
k.handler = self.plugin.create_handler(self)
password = k.get_password_for_storage_encryption()
except UserCancelled:
devmgr = self.plugins.device_manager
devmgr.unpair_xpub(k.xpub)
self.choose_hw_device()
return
except BaseException as e:
self.logger.exception('')
self.show_error(str(e))
return
self.request_storage_encryption(
run_next=lambda encrypt_storage: self.on_password(
password,
encrypt_storage=encrypt_storage,
storage_enc_version=STO_EV_XPUB_PW,
encrypt_keystore=False))
else:
# reset stack to disable 'back' button in password dialog
self.reset_stack()
# prompt the user to set an arbitrary password
self.request_password(
run_next=lambda password, encrypt_storage: self.on_password(
password,
encrypt_storage=encrypt_storage,
storage_enc_version=STO_EV_USER_PW,
encrypt_keystore=encrypt_keystore),
force_disable_encrypt_cb=not encrypt_keystore)
def on_password(self, password, *, encrypt_storage,
storage_enc_version=STO_EV_USER_PW, encrypt_keystore):
for k in self.keystores:
if k.may_have_password():
k.update_password(None, password)
if self.wallet_type == 'standard':
self.data['seed_type'] = self.seed_type
keys = self.keystores[0].dump()
self.data['keystore'] = keys
elif self.wallet_type == 'multisig':
for i, k in enumerate(self.keystores):
self.data['x%d/'%(i+1)] = k.dump()
elif self.wallet_type == 'imported':
if len(self.keystores) > 0:
keys = self.keystores[0].dump()
self.data['keystore'] = keys
else:
raise Exception('Unknown wallet type')
self.pw_args = password, encrypt_storage, storage_enc_version
self.terminate()
def create_storage(self, path):
if os.path.exists(path):
raise Exception('file already exists at path')
if not self.pw_args:
return
password, encrypt_storage, storage_enc_version = self.pw_args
self.pw_args = None # clean-up so that it can get GC-ed
storage = WalletStorage(path)
storage.set_keystore_encryption(bool(password))
if encrypt_storage:
storage.set_password(password, enc_version=storage_enc_version)
for key, value in self.data.items():
storage.put(key, value)
storage.write()
storage.load_plugins()
return storage
def terminate(self, *, storage: Optional[WalletStorage] = None):
raise NotImplementedError() # implemented by subclasses
def show_xpub_and_add_cosigners(self, xpub):
self.show_xpub_dialog(xpub=xpub, run_next=lambda x: self.run('choose_keystore'))
def choose_seed_type(self, message=None, choices=None):
title = _('Choose Seed type')
if message is None:
message = ' '.join([
_("The type of addresses used by your wallet will depend on your seed."),
_("Segwit wallets use bech32 addresses, defined in BIP173."),
_("Please note that websites and other wallets may not support these addresses yet."),
_("Thus, you might want to keep using a non-segwit wallet in order to be able to receive bitcoins during the transition period.")
])
if choices is None:
choices = [
('create_segwit_seed', _('Segwit')),
('create_standard_seed', _('Legacy')),
]
self.choice_dialog(title=title, message=message, choices=choices, run_next=self.run)
def create_segwit_seed(self): self.create_seed('segwit')
def create_standard_seed(self): self.create_seed('standard')
def create_seed(self, seed_type):
from . import mnemonic
self.seed_type = seed_type
seed = mnemonic.Mnemonic('en').make_seed(self.seed_type)
self.opt_bip39 = False
f = lambda x: self.request_passphrase(seed, x)
self.show_seed_dialog(run_next=f, seed_text=seed)
def request_passphrase(self, seed, opt_passphrase):
if opt_passphrase:
f = lambda x: self.confirm_seed(seed, x)
self.passphrase_dialog(run_next=f)
else:
self.run('confirm_seed', seed, '')
def confirm_seed(self, seed, passphrase):
f = lambda x: self.confirm_passphrase(seed, passphrase)
self.confirm_seed_dialog(run_next=f, test=lambda x: x==seed)
def confirm_passphrase(self, seed, passphrase):
f = lambda x: self.run('create_keystore', seed, x)
if passphrase:
title = _('Confirm Seed Extension')
message = '\n'.join([
_('Your seed extension must be saved together with your seed.'),
_('Please type it here.'),
])
self.line_dialog(run_next=f, title=title, message=message, default='', test=lambda x: x==passphrase)
else:
f('')