#!/usr/bin/env python # # Electrum - Lightweight Bitcoin Client # Copyright (C) 2015 Thomas Voegtlin # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import socket import os import requests import json from urllib.parse import urljoin from urllib.parse import quote import electrum from electrum import bitcoin from electrum import keystore from electrum.bitcoin import * from electrum.mnemonic import Mnemonic from electrum import version from electrum.wallet import Multisig_Wallet, Deterministic_Wallet from electrum.i18n import _ from electrum.plugins import BasePlugin, hook from electrum.util import NotEnoughFunds from electrum.storage import STO_EV_USER_PW # signing_xpub is hardcoded so that the wallet can be restored from seed, without TrustedCoin's server signing_xpub = "xpub661MyMwAqRbcGnMkaTx2594P9EDuiEqMq25PM2aeG6UmwzaohgA6uDmNsvSUV8ubqwA3Wpste1hg69XHgjUuCD5HLcEp2QPzyV1HMrPppsL" billing_xpub = "xpub6DTBdtBB8qUmH5c77v8qVGVoYk7WjJNpGvutqjLasNG1mbux6KsojaLrYf2sRhXAVU4NaFuHhbD9SvVPRt1MB1MaMooRuhHcAZH1yhQ1qDU" SEED_PREFIX = version.SEED_PREFIX_2FA DISCLAIMER = [ _("Two-factor authentication is a service provided by TrustedCoin. " "It uses a multi-signature wallet, where you own 2 of 3 keys. " "The third key is stored on a remote server that signs transactions on " "your behalf. To use this service, you will need a smartphone with " "Google Authenticator installed."), _("A small fee will be charged on each transaction that uses the " "remote server. You may check and modify your billing preferences " "once the installation is complete."), _("Note that your coins are not locked in this service. You may withdraw " "your funds at any time and at no cost, without the remote server, by " "using the 'restore wallet' option with your wallet seed."), _("The next step will generate the seed of your wallet. This seed will " "NOT be saved in your computer, and it must be stored on paper. " "To be safe from malware, you may want to do this on an offline " "computer, and move your wallet later to an online computer."), ] RESTORE_MSG = _("Enter the seed for your 2-factor wallet:") class TrustedCoinException(Exception): def __init__(self, message, status_code=0): Exception.__init__(self, message) self.status_code = status_code class TrustedCoinCosignerClient(object): def __init__(self, user_agent=None, base_url='https://api.trustedcoin.com/2/'): self.base_url = base_url self.debug = False self.user_agent = user_agent def send_request(self, method, relative_url, data=None): kwargs = {'headers': {}} if self.user_agent: kwargs['headers']['user-agent'] = self.user_agent if method == 'get' and data: kwargs['params'] = data elif method == 'post' and data: kwargs['data'] = json.dumps(data) kwargs['headers']['content-type'] = 'application/json' url = urljoin(self.base_url, relative_url) if self.debug: print('%s %s %s' % (method, url, data)) response = requests.request(method, url, **kwargs) if self.debug: print(response.text) if response.status_code != 200: message = str(response.text) if response.headers.get('content-type') == 'application/json': r = response.json() if 'message' in r: message = r['message'] raise TrustedCoinException(message, response.status_code) if response.headers.get('content-type') == 'application/json': return response.json() else: return response.text def get_terms_of_service(self, billing_plan='electrum-per-tx-otp'): """ Returns the TOS for the given billing plan as a plain/text unicode string. :param billing_plan: the plan to return the terms for """ payload = {'billing_plan': billing_plan} return self.send_request('get', 'tos', payload) def create(self, xpubkey1, xpubkey2, email, billing_plan='electrum-per-tx-otp'): """ Creates a new cosigner resource. :param xpubkey1: a bip32 extended public key (customarily the hot key) :param xpubkey2: a bip32 extended public key (customarily the cold key) :param email: a contact email :param billing_plan: the billing plan for the cosigner """ payload = { 'email': email, 'xpubkey1': xpubkey1, 'xpubkey2': xpubkey2, 'billing_plan': billing_plan, } return self.send_request('post', 'cosigner', payload) def auth(self, id, otp): """ Attempt to authenticate for a particular cosigner. :param id: the id of the cosigner :param otp: the one time password """ payload = {'otp': otp} return self.send_request('post', 'cosigner/%s/auth' % quote(id), payload) def get(self, id): """ Get billing info """ return self.send_request('get', 'cosigner/%s' % quote(id)) def get_challenge(self, id): """ Get challenge to reset Google Auth secret """ return self.send_request('get', 'cosigner/%s/otp_secret' % quote(id)) def reset_auth(self, id, challenge, signatures): """ Reset Google Auth secret """ payload = {'challenge':challenge, 'signatures':signatures} return self.send_request('post', 'cosigner/%s/otp_secret' % quote(id), payload) def sign(self, id, transaction, otp): """ Attempt to authenticate for a particular cosigner. :param id: the id of the cosigner :param transaction: the hex encoded [partially signed] compact transaction to sign :param otp: the one time password """ payload = { 'otp': otp, 'transaction': transaction } return self.send_request('post', 'cosigner/%s/sign' % quote(id), payload) def transfer_credit(self, id, recipient, otp, signature_callback): """ Tranfer a cosigner's credits to another cosigner. :param id: the id of the sending cosigner :param recipient: the id of the recipient cosigner :param otp: the one time password (of the sender) :param signature_callback: a callback that signs a text message using xpubkey1/0/0 returning a compact sig """ payload = { 'otp': otp, 'recipient': recipient, 'timestamp': int(time.time()), } relative_url = 'cosigner/%s/transfer' % quote(id) full_url = urljoin(self.base_url, relative_url) headers = { 'x-signature': signature_callback(full_url + '\n' + json.dumps(payload)) } return self.send_request('post', relative_url, payload, headers) server = TrustedCoinCosignerClient(user_agent="Electrum/" + version.ELECTRUM_VERSION) class Wallet_2fa(Multisig_Wallet): wallet_type = '2fa' def __init__(self, storage): self.m, self.n = 2, 3 Deterministic_Wallet.__init__(self, storage) self.is_billing = False self.billing_info = None def can_sign_without_server(self): return not self.keystores['x2/'].is_watching_only() def get_user_id(self): return get_user_id(self.storage) def get_max_amount(self, config, inputs, recipient, fee): from electrum.transaction import Transaction sendable = sum(map(lambda x:x['value'], inputs)) for i in inputs: self.add_input_info(i) xf = self.extra_fee(config) _type, addr = recipient if xf and sendable >= xf: billing_address = self.billing_info['billing_address'] sendable -= xf outputs = [(_type, addr, sendable), (TYPE_ADDRESS, billing_address, xf)] else: outputs = [(_type, addr, sendable)] dummy_tx = Transaction.from_io(inputs, outputs) if fee is None: fee = self.estimate_fee(config, dummy_tx.estimated_size()) amount = max(0, sendable - fee) return amount, fee def min_prepay(self): return min(self.price_per_tx.keys()) def num_prepay(self, config): default = self.min_prepay() n = config.get('trustedcoin_prepay', default) if n not in self.price_per_tx: n = default return n def extra_fee(self, config): if self.can_sign_without_server(): return 0 if self.billing_info is None: self.plugin.start_request_thread(self) return 0 if self.billing_info.get('tx_remaining'): return 0 if self.is_billing: return 0 n = self.num_prepay(config) price = int(self.price_per_tx[n]) assert price <= 100000 * n return price def make_unsigned_transaction(self, coins, outputs, config, fixed_fee=None, change_addr=None, is_sweep=False): mk_tx = lambda o: Multisig_Wallet.make_unsigned_transaction( self, coins, o, config, fixed_fee, change_addr) fee = self.extra_fee(config) if not is_sweep else 0 if fee: address = self.billing_info['billing_address'] fee_output = (TYPE_ADDRESS, address, fee) try: tx = mk_tx(outputs + [fee_output]) except NotEnoughFunds: # trustedcoin won't charge if the total inputs is # lower than their fee tx = mk_tx(outputs) if tx.input_value() >= fee: raise self.print_error("not charging for this tx") else: tx = mk_tx(outputs) return tx def sign_transaction(self, tx, password): Multisig_Wallet.sign_transaction(self, tx, password) if tx.is_complete(): return if not self.auth_code: self.print_error("sign_transaction: no auth code") return long_user_id, short_id = self.get_user_id() tx_dict = tx.as_dict() raw_tx = tx_dict["hex"] r = server.sign(short_id, raw_tx, self.auth_code) if r: raw_tx = r.get('transaction') tx.update(raw_tx) self.print_error("twofactor: is complete", tx.is_complete()) # reset billing_info self.billing_info = None # Utility functions def get_user_id(storage): def make_long_id(xpub_hot, xpub_cold): return bitcoin.sha256(''.join(sorted([xpub_hot, xpub_cold]))) xpub1 = storage.get('x1/')['xpub'] xpub2 = storage.get('x2/')['xpub'] long_id = make_long_id(xpub1, xpub2) short_id = hashlib.sha256(long_id).hexdigest() return long_id, short_id def make_xpub(xpub, s): version, _, _, _, c, cK = deserialize_xpub(xpub) cK2, c2 = bitcoin._CKD_pub(cK, c, s) return bitcoin.serialize_xpub(version, c2, cK2) def make_billing_address(wallet, num): long_id, short_id = wallet.get_user_id() xpub = make_xpub(billing_xpub, long_id) version, _, _, _, c, cK = deserialize_xpub(xpub) cK, c = bitcoin.CKD_pub(cK, c, num) return bitcoin.public_key_to_p2pkh(cK) class TrustedCoinPlugin(BasePlugin): wallet_class = Wallet_2fa def __init__(self, parent, config, name): BasePlugin.__init__(self, parent, config, name) self.wallet_class.plugin = self self.requesting = False @staticmethod def is_valid_seed(seed): return bitcoin.is_new_seed(seed, SEED_PREFIX) def is_available(self): return True def is_enabled(self): return True def can_user_disable(self): return False @hook def get_tx_extra_fee(self, wallet, tx): if type(wallet) != Wallet_2fa: return address = wallet.billing_info['billing_address'] for _type, addr, amount in tx.outputs(): if _type == TYPE_ADDRESS and addr == address: return address, amount def request_billing_info(self, wallet): self.print_error("request billing info") billing_info = server.get(wallet.get_user_id()[1]) billing_address = make_billing_address(wallet, billing_info['billing_index']) assert billing_address == billing_info['billing_address'] wallet.billing_info = billing_info wallet.price_per_tx = dict(billing_info['price_per_tx']) wallet.price_per_tx.pop(1) self.requesting = False return True def start_request_thread(self, wallet): from threading import Thread if self.requesting is False: self.requesting = True t = Thread(target=self.request_billing_info, args=(wallet,)) t.setDaemon(True) t.start() return t def make_seed(self): return Mnemonic('english').make_seed(seed_type='2fa', num_bits=128) @hook def do_clear(self, window): window.wallet.is_billing = False def show_disclaimer(self, wizard): wizard.set_icon(':icons/trustedcoin-wizard.png') wizard.stack = [] wizard.confirm_dialog(title='Disclaimer', message='\n\n'.join(DISCLAIMER), run_next = lambda x: wizard.run('choose_seed')) def choose_seed(self, wizard): title = _('Create or restore') message = _('Do you want to create a new seed, or to restore a wallet using an existing seed?') choices = [ ('create_seed', _('Create a new seed')), ('restore_wallet', _('I already have a seed')), ] wizard.choice_dialog(title=title, message=message, choices=choices, run_next=wizard.run) def create_seed(self, wizard): seed = self.make_seed() f = lambda x: wizard.request_passphrase(seed, x) wizard.show_seed_dialog(run_next=f, seed_text=seed) @classmethod def get_xkeys(self, seed, passphrase, derivation): from electrum.mnemonic import Mnemonic from electrum.keystore import bip32_root, bip32_private_derivation bip32_seed = Mnemonic.mnemonic_to_seed(seed, passphrase) xprv, xpub = bip32_root(bip32_seed, 'standard') xprv, xpub = bip32_private_derivation(xprv, "m/", derivation) return xprv, xpub @classmethod def xkeys_from_seed(self, seed, passphrase): words = seed.split() n = len(words) # old version use long seed phrases if n >= 24: assert passphrase == '' xprv1, xpub1 = self.get_xkeys(' '.join(words[0:12]), '', "m/") xprv2, xpub2 = self.get_xkeys(' '.join(words[12:]), '', "m/") elif n==12: xprv1, xpub1 = self.get_xkeys(seed, passphrase, "m/0'/") xprv2, xpub2 = self.get_xkeys(seed, passphrase, "m/1'/") else: raise BaseException('unrecognized seed length') return xprv1, xpub1, xprv2, xpub2 def create_keystore(self, wizard, seed, passphrase): # this overloads the wizard's method xprv1, xpub1, xprv2, xpub2 = self.xkeys_from_seed(seed, passphrase) k1 = keystore.from_xprv(xprv1) k2 = keystore.from_xpub(xpub2) wizard.request_password(run_next=lambda pw, encrypt: self.on_password(wizard, pw, encrypt, k1, k2)) def on_password(self, wizard, password, encrypt_storage, k1, k2): k1.update_password(None, password) wizard.storage.set_keystore_encryption(bool(password)) if encrypt_storage: wizard.storage.set_password(password, enc_version=STO_EV_USER_PW) wizard.storage.put('x1/', k1.dump()) wizard.storage.put('x2/', k2.dump()) wizard.storage.write() msg = [ _("Your wallet file is: {}.").format(os.path.abspath(wizard.storage.path)), _("You need to be online in order to complete the creation of " "your wallet. If you generated your seed on an offline " 'computer, click on "{}" to close this window, move your ' "wallet file to an online computer, and reopen it with " "Electrum.").format(_('Cancel')), _('If you are online, click on "{}" to continue.').format(_('Next')) ] msg = '\n\n'.join(msg) wizard.stack = [] wizard.confirm_dialog(title='', message=msg, run_next = lambda x: wizard.run('create_remote_key')) def restore_wallet(self, wizard): wizard.opt_bip39 = False wizard.opt_ext = True title = _("Restore two-factor Wallet") f = lambda seed, is_bip39, is_ext: wizard.run('on_restore_seed', seed, is_ext) wizard.restore_seed_dialog(run_next=f, test=self.is_valid_seed) def on_restore_seed(self, wizard, seed, is_ext): f = lambda x: self.restore_choice(wizard, seed, x) wizard.passphrase_dialog(run_next=f) if is_ext else f('') def restore_choice(self, wizard, seed, passphrase): wizard.set_icon(':icons/trustedcoin-wizard.png') wizard.stack = [] title = _('Restore 2FA wallet') msg = ' '.join([ 'You are going to restore a wallet protected with two-factor authentication.', 'Do you want to keep using two-factor authentication with this wallet,', 'or do you want to disable it, and have two master private keys in your wallet?' ]) choices = [('keep', 'Keep'), ('disable', 'Disable')] f = lambda x: self.on_choice(wizard, seed, passphrase, x) wizard.choice_dialog(choices=choices, message=msg, title=title, run_next=f) def on_choice(self, wizard, seed, passphrase, x): if x == 'disable': f = lambda pw, encrypt: wizard.run('on_restore_pw', seed, passphrase, pw, encrypt) wizard.request_password(run_next=f) else: self.create_keystore(wizard, seed, passphrase) def on_restore_pw(self, wizard, seed, passphrase, password, encrypt_storage): storage = wizard.storage xprv1, xpub1, xprv2, xpub2 = self.xkeys_from_seed(seed, passphrase) k1 = keystore.from_xprv(xprv1) k2 = keystore.from_xprv(xprv2) k1.add_seed(seed) k1.update_password(None, password) k2.update_password(None, password) storage.put('x1/', k1.dump()) storage.put('x2/', k2.dump()) long_user_id, short_id = get_user_id(storage) xpub3 = make_xpub(signing_xpub, long_user_id) k3 = keystore.from_xpub(xpub3) storage.put('x3/', k3.dump()) storage.set_keystore_encryption(bool(password)) if encrypt_storage: storage.set_password(password, enc_version=STO_EV_USER_PW) wizard.wallet = Wallet_2fa(storage) wizard.create_addresses() def create_remote_key(self, wizard): email = self.accept_terms_of_use(wizard) xpub1 = wizard.storage.get('x1/')['xpub'] xpub2 = wizard.storage.get('x2/')['xpub'] # Generate third key deterministically. long_user_id, short_id = get_user_id(wizard.storage) xpub3 = make_xpub(signing_xpub, long_user_id) # secret must be sent by the server try: r = server.create(xpub1, xpub2, email) except socket.error: wizard.show_message('Server not reachable, aborting') return except TrustedCoinException as e: if e.status_code == 409: r = None else: wizard.show_message(str(e)) return if r is None: otp_secret = None else: otp_secret = r.get('otp_secret') if not otp_secret: wizard.show_message(_('Error')) return _xpub3 = r['xpubkey_cosigner'] _id = r['id'] try: assert _id == short_id, ("user id error", _id, short_id) assert xpub3 == _xpub3, ("xpub3 error", xpub3, _xpub3) except Exception as e: wizard.show_message(str(e)) return self.check_otp(wizard, short_id, otp_secret, xpub3) def check_otp(self, wizard, short_id, otp_secret, xpub3): otp, reset = self.request_otp_dialog(wizard, short_id, otp_secret) if otp: self.do_auth(wizard, short_id, otp, xpub3) elif reset: wizard.opt_bip39 = False wizard.opt_ext = True f = lambda seed, is_bip39, is_ext: wizard.run('on_reset_seed', short_id, seed, is_ext, xpub3) wizard.restore_seed_dialog(run_next=f, test=self.is_valid_seed) def on_reset_seed(self, wizard, short_id, seed, is_ext, xpub3): f = lambda passphrase: wizard.run('on_reset_auth', short_id, seed, passphrase, xpub3) wizard.passphrase_dialog(run_next=f) if is_ext else f('') def do_auth(self, wizard, short_id, otp, xpub3): try: server.auth(short_id, otp) except: wizard.show_message(_('Incorrect password')) return k3 = keystore.from_xpub(xpub3) wizard.storage.put('x3/', k3.dump()) wizard.storage.put('use_trustedcoin', True) wizard.storage.write() wizard.wallet = Wallet_2fa(wizard.storage) wizard.run('create_addresses') def on_reset_auth(self, wizard, short_id, seed, passphrase, xpub3): xprv1, xpub1, xprv2, xpub2 = self.xkeys_from_seed(seed, passphrase) try: assert xpub1 == wizard.storage.get('x1/')['xpub'] assert xpub2 == wizard.storage.get('x2/')['xpub'] except: wizard.show_message(_('Incorrect seed')) return r = server.get_challenge(short_id) challenge = r.get('challenge') message = 'TRUSTEDCOIN CHALLENGE: ' + challenge def f(xprv): _, _, _, _, c, k = deserialize_xprv(xprv) pk = bip32_private_key([0, 0], k, c) key = regenerate_key(pk) sig = key.sign_message(message, True) return base64.b64encode(sig).decode() signatures = [f(x) for x in [xprv1, xprv2]] r = server.reset_auth(short_id, challenge, signatures) new_secret = r.get('otp_secret') if not new_secret: wizard.show_message(_('Request rejected by server')) return self.check_otp(wizard, short_id, new_secret, xpub3) @hook def get_action(self, storage): if storage.get('wallet_type') != '2fa': return if not storage.get('x1/'): return self, 'show_disclaimer' if not storage.get('x2/'): return self, 'show_disclaimer' if not storage.get('x3/'): return self, 'create_remote_key'