Browse Source

Merge pull request #4838 from SomberNight/keystore_pw_hash2b

keystore: stronger pbkdf for encryption
3.3.3.1
ThomasV 6 years ago
committed by GitHub
parent
commit
2484c52611
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 95
      electrum/crypto.py
  2. 58
      electrum/keystore.py
  3. 8
      electrum/plugins/digitalbitbox/digitalbitbox.py
  4. 27
      electrum/tests/test_bitcoin.py

95
electrum/crypto.py

@ -32,6 +32,7 @@ from typing import Union
import pyaes
from .util import assert_bytes, InvalidPassword, to_bytes, to_string
from .i18n import _
try:
@ -90,37 +91,103 @@ def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
raise InvalidPassword()
def EncodeAES(secret: bytes, msg: bytes) -> bytes:
def EncodeAES_base64(secret: bytes, msg: bytes) -> bytes:
"""Returns base64 encoded ciphertext."""
e = EncodeAES_bytes(secret, msg)
return base64.b64encode(e)
def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
assert_bytes(msg)
iv = bytes(os.urandom(16))
ct = aes_encrypt_with_iv(secret, iv, msg)
e = iv + ct
return base64.b64encode(e)
return iv + ct
def DecodeAES_base64(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
ciphertext = bytes(base64.b64decode(ciphertext_b64))
return DecodeAES_bytes(secret, ciphertext)
def DecodeAES(secret: bytes, ciphertext_b64: Union[bytes, str]) -> bytes:
e = bytes(base64.b64decode(ciphertext_b64))
iv, e = e[:16], e[16:]
def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
assert_bytes(ciphertext)
iv, e = ciphertext[:16], ciphertext[16:]
s = aes_decrypt_with_iv(secret, iv, e)
return s
def pw_encode(data: str, password: Union[bytes, str]) -> str:
PW_HASH_VERSION_LATEST = 2
KNOWN_PW_HASH_VERSIONS = (1, 2)
assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
class UnexpectedPasswordHashVersion(InvalidPassword):
def __init__(self, version):
self.version = version
def __str__(self):
return "{unexpected}: {version}\n{please_update}".format(
unexpected=_("Unexpected password hash version"),
version=self.version,
please_update=_('You are most likely using an outdated version of Electrum. Please update.'))
def _hash_password(password: Union[bytes, str], *, version: int, salt: bytes) -> bytes:
pw = to_bytes(password, 'utf8')
if version == 1:
return sha256d(pw)
elif version == 2:
if not isinstance(salt, bytes) or len(salt) < 16:
raise Exception('too weak salt', salt)
return hashlib.pbkdf2_hmac(hash_name='sha256',
password=pw,
salt=b'ELECTRUM_PW_HASH_V2'+salt,
iterations=50_000)
else:
assert version not in KNOWN_PW_HASH_VERSIONS
raise UnexpectedPasswordHashVersion(version)
def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
if not password:
return data
secret = sha256d(password)
return EncodeAES(secret, to_bytes(data, "utf8")).decode('utf8')
if version not in KNOWN_PW_HASH_VERSIONS:
raise UnexpectedPasswordHashVersion(version)
# derive key from password
if version == 1:
salt = b''
elif version == 2:
salt = bytes(os.urandom(16))
else:
assert False, version
secret = _hash_password(password, version=version, salt=salt)
# encrypt given data
e = EncodeAES_bytes(secret, to_bytes(data, "utf8"))
# return base64(salt + encrypted data)
ciphertext = salt + e
ciphertext_b64 = base64.b64encode(ciphertext)
return ciphertext_b64.decode('utf8')
def pw_decode(data: str, password: Union[bytes, str]) -> str:
def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
if password is None:
return data
secret = sha256d(password)
if version not in KNOWN_PW_HASH_VERSIONS:
raise UnexpectedPasswordHashVersion(version)
data_bytes = bytes(base64.b64decode(data))
# derive key from password
if version == 1:
salt = b''
elif version == 2:
salt, data_bytes = data_bytes[:16], data_bytes[16:]
else:
assert False, version
secret = _hash_password(password, version=version, salt=salt)
# decrypt given data
try:
d = to_string(DecodeAES(secret, data), "utf8")
except Exception:
raise InvalidPassword()
d = to_string(DecodeAES_bytes(secret, data_bytes), "utf8")
except Exception as e:
raise InvalidPassword() from e
return d

58
electrum/keystore.py

@ -35,7 +35,7 @@ from .bip32 import (bip32_public_derivation, deserialize_xpub, CKD_pub,
bip32_private_key, bip32_derivation, BIP32_PRIME,
is_xpub, is_xprv)
from .ecc import string_to_number, number_to_string
from .crypto import pw_decode, pw_encode, sha256d
from .crypto import (pw_decode, pw_encode, sha256d, PW_HASH_VERSION_LATEST)
from .util import (PrintError, InvalidPassword, hfu, WalletFileException,
BitcoinException, bh2u, bfh, print_error, inv_dict)
from .mnemonic import Mnemonic, load_wordlist
@ -92,8 +92,9 @@ class KeyStore(PrintError):
class Software_KeyStore(KeyStore):
def __init__(self):
def __init__(self, d):
KeyStore.__init__(self)
self.pw_hash_version = d.get('pw_hash_version', 1)
def may_have_password(self):
return not self.is_watching_only()
@ -122,6 +123,12 @@ class Software_KeyStore(KeyStore):
if keypairs:
tx.sign(keypairs)
def update_password(self, old_password, new_password):
raise NotImplementedError() # implemented by subclasses
def check_password(self, password):
raise NotImplementedError() # implemented by subclasses
class Imported_KeyStore(Software_KeyStore):
# keystore for imported private keys
@ -129,7 +136,7 @@ class Imported_KeyStore(Software_KeyStore):
type = 'imported'
def __init__(self, d):
Software_KeyStore.__init__(self)
Software_KeyStore.__init__(self, d)
self.keypairs = d.get('keypairs', {})
def is_deterministic(self):
@ -142,6 +149,7 @@ class Imported_KeyStore(Software_KeyStore):
return {
'type': self.type,
'keypairs': self.keypairs,
'pw_hash_version': self.pw_hash_version,
}
def can_import(self):
@ -161,14 +169,14 @@ class Imported_KeyStore(Software_KeyStore):
# there will only be one pubkey-privkey pair for it in self.keypairs,
# and the privkey will encode a txin_type but that txin_type cannot be trusted.
# Removing keys complicates this further.
self.keypairs[pubkey] = pw_encode(serialized_privkey, password)
self.keypairs[pubkey] = pw_encode(serialized_privkey, password, version=self.pw_hash_version)
return txin_type, pubkey
def delete_imported_key(self, key):
self.keypairs.pop(key)
def get_private_key(self, pubkey, password):
sec = pw_decode(self.keypairs[pubkey], password)
sec = pw_decode(self.keypairs[pubkey], password, version=self.pw_hash_version)
txin_type, privkey, compressed = deserialize_privkey(sec)
# this checks the password
if pubkey != ecc.ECPrivkey(privkey).get_public_key_hex(compressed=compressed):
@ -189,16 +197,17 @@ class Imported_KeyStore(Software_KeyStore):
if new_password == '':
new_password = None
for k, v in self.keypairs.items():
b = pw_decode(v, old_password)
c = pw_encode(b, new_password)
b = pw_decode(v, old_password, version=self.pw_hash_version)
c = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
self.keypairs[k] = c
self.pw_hash_version = PW_HASH_VERSION_LATEST
class Deterministic_KeyStore(Software_KeyStore):
def __init__(self, d):
Software_KeyStore.__init__(self)
Software_KeyStore.__init__(self, d)
self.seed = d.get('seed', '')
self.passphrase = d.get('passphrase', '')
@ -206,12 +215,14 @@ class Deterministic_KeyStore(Software_KeyStore):
return True
def dump(self):
d = {}
d = {
'type': self.type,
'pw_hash_version': self.pw_hash_version,
}
if self.seed:
d['seed'] = self.seed
if self.passphrase:
d['passphrase'] = self.passphrase
d['type'] = self.type
return d
def has_seed(self):
@ -226,10 +237,13 @@ class Deterministic_KeyStore(Software_KeyStore):
self.seed = self.format_seed(seed)
def get_seed(self, password):
return pw_decode(self.seed, password)
return pw_decode(self.seed, password, version=self.pw_hash_version)
def get_passphrase(self, password):
return pw_decode(self.passphrase, password) if self.passphrase else ''
if self.passphrase:
return pw_decode(self.passphrase, password, version=self.pw_hash_version)
else:
return ''
class Xpub:
@ -312,10 +326,10 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
return d
def get_master_private_key(self, password):
return pw_decode(self.xprv, password)
return pw_decode(self.xprv, password, version=self.pw_hash_version)
def check_password(self, password):
xprv = pw_decode(self.xprv, password)
xprv = pw_decode(self.xprv, password, version=self.pw_hash_version)
if deserialize_xprv(xprv)[4] != deserialize_xpub(self.xpub)[4]:
raise InvalidPassword()
@ -325,13 +339,14 @@ class BIP32_KeyStore(Deterministic_KeyStore, Xpub):
new_password = None
if self.has_seed():
decoded = self.get_seed(old_password)
self.seed = pw_encode(decoded, new_password)
self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
if self.passphrase:
decoded = self.get_passphrase(old_password)
self.passphrase = pw_encode(decoded, new_password)
self.passphrase = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
if self.xprv is not None:
b = pw_decode(self.xprv, old_password)
self.xprv = pw_encode(b, new_password)
b = pw_decode(self.xprv, old_password, version=self.pw_hash_version)
self.xprv = pw_encode(b, new_password, version=PW_HASH_VERSION_LATEST)
self.pw_hash_version = PW_HASH_VERSION_LATEST
def is_watching_only(self):
return self.xprv is None
@ -362,7 +377,7 @@ class Old_KeyStore(Deterministic_KeyStore):
self.mpk = d.get('mpk')
def get_hex_seed(self, password):
return pw_decode(self.seed, password).encode('utf8')
return pw_decode(self.seed, password, version=self.pw_hash_version).encode('utf8')
def dump(self):
d = Deterministic_KeyStore.dump(self)
@ -484,8 +499,9 @@ class Old_KeyStore(Deterministic_KeyStore):
if new_password == '':
new_password = None
if self.has_seed():
decoded = pw_decode(self.seed, old_password)
self.seed = pw_encode(decoded, new_password)
decoded = pw_decode(self.seed, old_password, version=self.pw_hash_version)
self.seed = pw_encode(decoded, new_password, version=PW_HASH_VERSION_LATEST)
self.pw_hash_version = PW_HASH_VERSION_LATEST

8
electrum/plugins/digitalbitbox/digitalbitbox.py

@ -4,7 +4,7 @@
#
try:
from electrum.crypto import sha256d, EncodeAES, DecodeAES
from electrum.crypto import sha256d, EncodeAES_base64, DecodeAES_base64
from electrum.bitcoin import (TYPE_ADDRESS, push_script, var_int, public_key_to_p2pkh,
is_address)
from electrum.bip32 import serialize_xpub, deserialize_xpub
@ -396,10 +396,10 @@ class DigitalBitbox_Client():
reply = ""
try:
secret = sha256d(self.password)
msg = EncodeAES(secret, msg)
msg = EncodeAES_base64(secret, msg)
reply = self.hid_send_plain(msg)
if 'ciphertext' in reply:
reply = DecodeAES(secret, ''.join(reply["ciphertext"]))
reply = DecodeAES_base64(secret, ''.join(reply["ciphertext"]))
reply = to_string(reply, 'utf8')
reply = json.loads(reply)
if 'error' in reply:
@ -716,7 +716,7 @@ class DigitalBitboxPlugin(HW_PluginBase):
key_s = base64.b64decode(self.digitalbitbox_config['encryptionprivkey'])
args = 'c=data&s=0&dt=0&uuid=%s&pl=%s' % (
self.digitalbitbox_config['comserverchannelid'],
EncodeAES(key_s, json.dumps(payload).encode('ascii')).decode('ascii'),
EncodeAES_base64(key_s, json.dumps(payload).encode('ascii')).decode('ascii'),
)
try:
requests.post(url, args)

27
electrum/tests/test_bitcoin.py

@ -11,11 +11,11 @@ from electrum.bitcoin import (public_key_to_p2pkh, address_from_private_key,
from electrum.bip32 import (bip32_root, bip32_public_derivation, bip32_private_derivation,
xpub_from_xprv, xpub_type, is_xprv, is_bip32_derivation,
is_xpub, convert_bip32_path_to_list_of_uint32)
from electrum.crypto import sha256d
from electrum.crypto import sha256d, KNOWN_PW_HASH_VERSIONS
from electrum import ecc, crypto, constants
from electrum.ecc import number_to_string, string_to_number
from electrum.transaction import opcodes
from electrum.util import bfh, bh2u
from electrum.util import bfh, bh2u, InvalidPassword
from electrum.storage import WalletStorage
from electrum.keystore import xtype_from_derivation
@ -219,23 +219,26 @@ class Test_bitcoin(SequentialTestCase):
"""Make sure AES is homomorphic."""
payload = u'\u66f4\u7a33\u5b9a\u7684\u4ea4\u6613\u5e73\u53f0'
password = u'secret'
enc = crypto.pw_encode(payload, password)
dec = crypto.pw_decode(enc, password)
self.assertEqual(dec, payload)
for version in KNOWN_PW_HASH_VERSIONS:
enc = crypto.pw_encode(payload, password, version=version)
dec = crypto.pw_decode(enc, password, version=version)
self.assertEqual(dec, payload)
@needs_test_with_all_aes_implementations
def test_aes_encode_without_password(self):
"""When not passed a password, pw_encode is noop on the payload."""
payload = u'\u66f4\u7a33\u5b9a\u7684\u4ea4\u6613\u5e73\u53f0'
enc = crypto.pw_encode(payload, None)
self.assertEqual(payload, enc)
for version in KNOWN_PW_HASH_VERSIONS:
enc = crypto.pw_encode(payload, None, version=version)
self.assertEqual(payload, enc)
@needs_test_with_all_aes_implementations
def test_aes_deencode_without_password(self):
"""When not passed a password, pw_decode is noop on the payload."""
payload = u'\u66f4\u7a33\u5b9a\u7684\u4ea4\u6613\u5e73\u53f0'
enc = crypto.pw_decode(payload, None)
self.assertEqual(payload, enc)
for version in KNOWN_PW_HASH_VERSIONS:
enc = crypto.pw_decode(payload, None, version=version)
self.assertEqual(payload, enc)
@needs_test_with_all_aes_implementations
def test_aes_decode_with_invalid_password(self):
@ -243,8 +246,10 @@ class Test_bitcoin(SequentialTestCase):
payload = u"blah"
password = u"uber secret"
wrong_password = u"not the password"
enc = crypto.pw_encode(payload, password)
self.assertRaises(Exception, crypto.pw_decode, enc, wrong_password)
for version in KNOWN_PW_HASH_VERSIONS:
enc = crypto.pw_encode(payload, password, version=version)
with self.assertRaises(InvalidPassword):
crypto.pw_decode(enc, wrong_password, version=version)
def test_sha256d(self):
self.assertEqual(b'\x95MZI\xfdp\xd9\xb8\xbc\xdb5\xd2R&x)\x95\x7f~\xf7\xfalt\xf8\x84\x19\xbd\xc5\xe8"\t\xf4',

Loading…
Cancel
Save