Browse Source

crypto: add 'cryptography' as alt dependency for 'pycryptodomex'

hard-fail-on-bad-server-string
SomberNight 5 years ago
parent
commit
74a3faf803
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 97
      electrum/crypto.py
  2. 84
      electrum/tests/test_bitcoin.py
  3. 4
      electrum/tests/test_lnpeer.py
  4. 5
      electrum/tests/test_lnrouter.py
  5. 3
      electrum/tests/test_lntransport.py

97
electrum/crypto.py

@ -25,6 +25,7 @@
import base64
import os
import sys
import hashlib
import hmac
from typing import Union
@ -35,12 +36,33 @@ from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFile
from .i18n import _
HAS_CRYPTODOME = False
try:
from Cryptodome.Cipher import AES
from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
from Cryptodome.Cipher import AES as CD_AES
except:
AES = None
pass
else:
HAS_CRYPTODOME = True
HAS_CRYPTOGRAPHY = False
try:
import cryptography
from cryptography import exceptions
from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
from cryptography.hazmat.primitives.ciphers import modes as CG_modes
from cryptography.hazmat.backends import default_backend as CG_default_backend
import cryptography.hazmat.primitives.ciphers.aead as CG_aead
except:
pass
else:
HAS_CRYPTOGRAPHY = True
from Cryptodome.Cipher import ChaCha20_Poly1305, ChaCha20
if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
class InvalidPadding(Exception):
@ -69,8 +91,12 @@ def strip_PKCS7_padding(data: bytes) -> bytes:
def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
assert_bytes(key, iv, data)
data = append_PKCS7_padding(data)
if AES:
e = AES.new(key, AES.MODE_CBC, iv).encrypt(data)
if HAS_CRYPTODOME:
e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
elif HAS_CRYPTOGRAPHY:
cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
encryptor = cipher.encryptor()
e = encryptor.update(data) + encryptor.finalize()
else:
aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
@ -80,9 +106,13 @@ def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
assert_bytes(key, iv, data)
if AES:
cipher = AES.new(key, AES.MODE_CBC, iv)
if HAS_CRYPTODOME:
cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
data = cipher.decrypt(data)
elif HAS_CRYPTOGRAPHY:
cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
decryptor = cipher.decryptor()
data = decryptor.update(data) + decryptor.finalize()
else:
aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
@ -221,19 +251,52 @@ def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
def chacha20_poly1305_encrypt(*, key: bytes, nonce: bytes, associated_data: bytes, data: bytes) -> bytes:
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
cipher.update(associated_data)
ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
return ciphertext + mac
assert isinstance(key, (bytes, bytearray))
assert isinstance(nonce, (bytes, bytearray))
assert isinstance(associated_data, (bytes, bytearray))
assert isinstance(data, (bytes, bytearray))
if HAS_CRYPTODOME:
cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
cipher.update(associated_data)
ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
return ciphertext + mac
if HAS_CRYPTOGRAPHY:
a = CG_aead.ChaCha20Poly1305(key)
return a.encrypt(nonce, data, associated_data)
raise Exception("no chacha20 backed found")
def chacha20_poly1305_decrypt(*, key: bytes, nonce: bytes, associated_data: bytes, data: bytes) -> bytes:
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
cipher.update(associated_data)
# raises ValueError if not valid (e.g. incorrect MAC)
return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
assert isinstance(key, (bytes, bytearray))
assert isinstance(nonce, (bytes, bytearray))
assert isinstance(associated_data, (bytes, bytearray))
assert isinstance(data, (bytes, bytearray))
if HAS_CRYPTODOME:
cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
cipher.update(associated_data)
# raises ValueError if not valid (e.g. incorrect MAC)
return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
if HAS_CRYPTOGRAPHY:
a = CG_aead.ChaCha20Poly1305(key)
try:
return a.decrypt(nonce, data, associated_data)
except cryptography.exceptions.InvalidTag as e:
raise ValueError("invalid tag") from e
raise Exception("no chacha20 backed found")
def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
cipher = ChaCha20.new(key=key, nonce=nonce)
return cipher.encrypt(data)
assert isinstance(key, (bytes, bytearray))
assert isinstance(nonce, (bytes, bytearray))
assert isinstance(data, (bytes, bytearray))
assert len(nonce) == 8, f"unexpected nonce size: {len(nonce)} (expected: 8)"
if HAS_CRYPTODOME:
cipher = CD_ChaCha20.new(key=key, nonce=nonce)
return cipher.encrypt(data)
if HAS_CRYPTOGRAPHY:
nonce = bytes(8) + nonce # cryptography wants 16 byte nonces
algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
encryptor = cipher.encryptor()
return encryptor.update(data)
raise Exception("no chacha20 backed found")

84
electrum/tests/test_bitcoin.py

@ -34,8 +34,8 @@ except ImportError:
def needs_test_with_all_aes_implementations(func):
"""Function decorator to run a unit test twice:
once when pycryptodomex is not available, once when it is.
"""Function decorator to run a unit test multiple times:
once with each AES implementation.
NOTE: this is inherently sequential;
tests running in parallel would break things
@ -44,18 +44,46 @@ def needs_test_with_all_aes_implementations(func):
if FAST_TESTS: # if set, only run tests once, using fastest implementation
func(*args, **kwargs)
return
_aes = crypto.AES
crypto.AES = None
has_cryptodome = crypto.HAS_CRYPTODOME
has_cryptography = crypto.HAS_CRYPTOGRAPHY
try:
# first test without pycryptodomex
func(*args, **kwargs)
(crypto.HAS_CRYPTODOME, crypto.HAS_CRYPTOGRAPHY) = False, False
func(*args, **kwargs) # pyaes
if has_cryptodome:
(crypto.HAS_CRYPTODOME, crypto.HAS_CRYPTOGRAPHY) = True, False
func(*args, **kwargs) # cryptodome
if has_cryptography:
(crypto.HAS_CRYPTODOME, crypto.HAS_CRYPTOGRAPHY) = False, True
func(*args, **kwargs) # cryptography
finally:
crypto.AES = _aes
# if pycryptodomex is not available, we are done
if not _aes:
crypto.HAS_CRYPTODOME = has_cryptodome
crypto.HAS_CRYPTOGRAPHY = has_cryptography
return run_test
def needs_test_with_all_chacha20_implementations(func):
"""Function decorator to run a unit test multiple times:
once with each ChaCha20/Poly1305 implementation.
NOTE: this is inherently sequential;
tests running in parallel would break things
"""
def run_test(*args, **kwargs):
if FAST_TESTS: # if set, only run tests once, using fastest implementation
func(*args, **kwargs)
return
# if pycryptodomex is available, test again now
func(*args, **kwargs)
has_cryptodome = crypto.HAS_CRYPTODOME
has_cryptography = crypto.HAS_CRYPTOGRAPHY
try:
if has_cryptodome:
(crypto.HAS_CRYPTODOME, crypto.HAS_CRYPTOGRAPHY) = True, False
func(*args, **kwargs) # cryptodome
if has_cryptography:
(crypto.HAS_CRYPTODOME, crypto.HAS_CRYPTOGRAPHY) = False, True
func(*args, **kwargs) # cryptography
finally:
crypto.HAS_CRYPTODOME = has_cryptodome
crypto.HAS_CRYPTOGRAPHY = has_cryptography
return run_test
@ -67,7 +95,11 @@ class Test_bitcoin(ElectrumTestCase):
def test_pycryptodomex_is_available(self):
# we want the unit testing framework to test with pycryptodomex available.
self.assertTrue(bool(crypto.AES))
self.assertTrue(bool(crypto.HAS_CRYPTODOME))
def test_cryptography_is_available(self):
# we want the unit testing framework to test with cryptography available.
self.assertTrue(bool(crypto.HAS_CRYPTOGRAPHY))
@needs_test_with_all_aes_implementations
def test_crypto(self):
@ -223,6 +255,34 @@ class Test_bitcoin(ElectrumTestCase):
with self.assertRaises(InvalidPassword):
crypto.pw_decode(enc, wrong_password, version=version)
@needs_test_with_all_chacha20_implementations
def test_chacha20_poly1305_encrypt(self):
key = bytes.fromhex('37326d9d69a83b815ddfd947d21b0dd39111e5b6a5a44042c44d570ea03e3179')
nonce = bytes.fromhex('010203040506070809101112')
associated_data = bytes.fromhex('30c9572d4305d4f3ccb766b1db884da6f1e0086f55136a39740700c272095717')
data = bytes.fromhex('4a6cd75da76cedf0a8a47e3a5734a328')
self.assertEqual(bytes.fromhex('90fb51fcde1fbe4013500bd7a32280445d80ee21f0aa3acd30df72cf609de064'),
crypto.chacha20_poly1305_encrypt(key=key, nonce=nonce, associated_data=associated_data, data=data))
@needs_test_with_all_chacha20_implementations
def test_chacha20_poly1305_decrypt(self):
key = bytes.fromhex('37326d9d69a83b815ddfd947d21b0dd39111e5b6a5a44042c44d570ea03e3179')
nonce = bytes.fromhex('010203040506070809101112')
associated_data = bytes.fromhex('30c9572d4305d4f3ccb766b1db884da6f1e0086f55136a39740700c272095717')
data = bytes.fromhex('90fb51fcde1fbe4013500bd7a32280445d80ee21f0aa3acd30df72cf609de064')
self.assertEqual(bytes.fromhex('4a6cd75da76cedf0a8a47e3a5734a328'),
crypto.chacha20_poly1305_decrypt(key=key, nonce=nonce, associated_data=associated_data, data=data))
with self.assertRaises(ValueError):
crypto.chacha20_poly1305_decrypt(key=key, nonce=nonce, associated_data=b'', data=data)
@needs_test_with_all_chacha20_implementations
def test_chacha20_encrypt(self):
key = bytes.fromhex('37326d9d69a83b815ddfd947d21b0dd39111e5b6a5a44042c44d570ea03e3179')
nonce = bytes.fromhex('0102030405060708')
data = bytes.fromhex('38a0e0a7c865fe9ca31f0730cfcab610f18e6da88dc3790f1d243f711a257c78')
self.assertEqual(bytes.fromhex('f62fbd74d197323c7c3d5658476a884d38ee6f4b5500add1e8dc80dcd9c15dff'),
crypto.chacha20_encrypt(key=key, nonce=nonce, data=data))
def test_sha256d(self):
self.assertEqual(b'\x95MZI\xfdp\xd9\xb8\xbc\xdb5\xd2R&x)\x95\x7f~\xf7\xfalt\xf8\x84\x19\xbd\xc5\xe8"\t\xf4',
sha256d(u"test"))

4
electrum/tests/test_lnpeer.py

@ -28,6 +28,7 @@ from electrum.logging import console_stderr_handler
from electrum.lnworker import PaymentInfo, RECEIVED, PR_UNPAID
from .test_lnchannel import create_test_channels
from .test_bitcoin import needs_test_with_all_chacha20_implementations
from . import ElectrumTestCase
def keypair():
@ -244,6 +245,7 @@ class TestPeer(ElectrumTestCase):
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
@needs_test_with_all_chacha20_implementations
def test_reestablish_with_old_state(self):
alice_channel, bob_channel = create_test_channels()
alice_channel_0, bob_channel_0 = create_test_channels() # these are identical
@ -277,6 +279,7 @@ class TestPeer(ElectrumTestCase):
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
@needs_test_with_all_chacha20_implementations
def test_payment(self):
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)
@ -291,6 +294,7 @@ class TestPeer(ElectrumTestCase):
with self.assertRaises(concurrent.futures.CancelledError):
run(f())
@needs_test_with_all_chacha20_implementations
def test_close(self):
alice_channel, bob_channel = create_test_channels()
p1, p2, w1, w2, _q1, _q2 = self.prepare_peers(alice_channel, bob_channel)

5
electrum/tests/test_lnrouter.py

@ -12,6 +12,8 @@ from electrum.constants import BitcoinTestnet
from electrum.simple_config import SimpleConfig
from . import TestCaseForTestnet
from .test_bitcoin import needs_test_with_all_chacha20_implementations
class Test_LNRouter(TestCaseForTestnet):
@ -108,6 +110,7 @@ class Test_LNRouter(TestCaseForTestnet):
self._loop_thread.join(timeout=1)
cdb.sql_thread.join(timeout=1)
@needs_test_with_all_chacha20_implementations
def test_new_onion_packet(self):
# test vector from bolt-04
payment_path_pubkeys = [
@ -140,6 +143,7 @@ class Test_LNRouter(TestCaseForTestnet):
self.assertEqual(bfh('0002eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619e5f14350c2a76fc232b5e46d421e9615471ab9e0bc887beff8c95fdb878f7b3a71da571226458c510bbadd1276f045c21c520a07d35da256ef75b4367962437b0dd10f7d61ab590531cf08000178a333a347f8b4072e216400406bdf3bf038659793a86cae5f52d32f3438527b47a1cfc54285a8afec3a4c9f3323db0c946f5d4cb2ce721caad69320c3a469a202f3e468c67eaf7a7cda226d0fd32f7b48084dca885d15222e60826d5d971f64172d98e0760154400958f00e86697aa1aa9d41bee8119a1ec866abe044a9ad635778ba61fc0776dc832b39451bd5d35072d2269cf9b040d6ba38b54ec35f81d7fc67678c3be47274f3c4cc472aff005c3469eb3bc140769ed4c7f0218ff8c6c7dd7221d189c65b3b9aaa71a01484b122846c7c7b57e02e679ea8469b70e14fe4f70fee4d87b910cf144be6fe48eef24da475c0b0bcc6565ae82cd3f4e3b24c76eaa5616c6111343306ab35c1fe5ca4a77c0e314ed7dba39d6f1e0de791719c241a939cc493bea2bae1c1e932679ea94d29084278513c77b899cc98059d06a27d171b0dbdf6bee13ddc4fc17a0c4d2827d488436b57baa167544138ca2e64a11b43ac8a06cd0c2fba2d4d900ed2d9205305e2d7383cc98dacb078133de5f6fb6bed2ef26ba92cea28aafc3b9948dd9ae5559e8bd6920b8cea462aa445ca6a95e0e7ba52961b181c79e73bd581821df2b10173727a810c92b83b5ba4a0403eb710d2ca10689a35bec6c3a708e9e92f7d78ff3c5d9989574b00c6736f84c199256e76e19e78f0c98a9d580b4a658c84fc8f2096c2fbea8f5f8c59d0fdacb3be2802ef802abbecb3aba4acaac69a0e965abd8981e9896b1f6ef9d60f7a164b371af869fd0e48073742825e9434fc54da837e120266d53302954843538ea7c6c3dbfb4ff3b2fdbe244437f2a153ccf7bdb4c92aa08102d4f3cff2ae5ef86fab4653595e6a5837fa2f3e29f27a9cde5966843fb847a4a61f1e76c281fe8bb2b0a181d096100db5a1a5ce7a910238251a43ca556712eaadea167fb4d7d75825e440f3ecd782036d7574df8bceacb397abefc5f5254d2722215c53ff54af8299aaaad642c6d72a14d27882d9bbd539e1cc7a527526ba89b8c037ad09120e98ab042d3e8652b31ae0e478516bfaf88efca9f3676ffe99d2819dcaeb7610a626695f53117665d267d3f7abebd6bbd6733f645c72c389f03855bdf1e4b8075b516569b118233a0f0971d24b83113c0b096f5216a207ca99a7cddc81c130923fe3d91e7508c9ac5f2e914ff5dccab9e558566fa14efb34ac98d878580814b94b73acbfde9072f30b881f7f0fff42d4045d1ace6322d86a97d164aa84d93a60498065cc7c20e636f5862dc81531a88c60305a2e59a985be327a6902e4bed986dbf4a0b50c217af0ea7fdf9ab37f9ea1a1aaa72f54cf40154ea9b269f1a7c09f9f43245109431a175d50e2db0132337baa0ef97eed0fcf20489da36b79a1172faccc2f7ded7c60e00694282d93359c4682135642bc81f433574aa8ef0c97b4ade7ca372c5ffc23c7eddd839bab4e0f14d6df15c9dbeab176bec8b5701cf054eb3072f6dadc98f88819042bf10c407516ee58bce33fbe3b3d86a54255e577db4598e30a135361528c101683a5fcde7e8ba53f3456254be8f45fe3a56120ae96ea3773631fcb3873aa3abd91bcff00bd38bd43697a2e789e00da6077482e7b1b1a677b5afae4c54e6cbdf7377b694eb7d7a5b913476a5be923322d3de06060fd5e819635232a2cf4f0731da13b8546d1d6d4f8d75b9fce6c2341a71b0ea6f780df54bfdb0dd5cd9855179f602f917265f21f9190c70217774a6fbaaa7d63ad64199f4664813b955cff954949076dcf'),
packet.to_bytes())
@needs_test_with_all_chacha20_implementations
def test_process_onion_packet(self):
# this test is not from bolt-04, but is based on the one there;
# except here we have the privkeys for these pubkeys
@ -184,6 +188,7 @@ class Test_LNRouter(TestCaseForTestnet):
self.assertEqual(hops_data[i].per_hop.to_bytes(), processed_packet.hop_data.per_hop.to_bytes())
packet = processed_packet.next_packet
@needs_test_with_all_chacha20_implementations
def test_decode_onion_error(self):
# test vector from bolt-04
payment_path_pubkeys = [

3
electrum/tests/test_lntransport.py

@ -5,10 +5,12 @@ from electrum.lnutil import LNPeerAddr
from electrum.lntransport import LNResponderTransport, LNTransport
from . import ElectrumTestCase
from .test_bitcoin import needs_test_with_all_chacha20_implementations
class TestLNTransport(ElectrumTestCase):
@needs_test_with_all_chacha20_implementations
def test_responder(self):
# local static
ls_priv=bytes.fromhex('2121212121212121212121212121212121212121212121212121212121212121')
@ -37,6 +39,7 @@ class TestLNTransport(ElectrumTestCase):
transport = LNResponderTransport(ls_priv, Reader(), Writer())
asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv))
@needs_test_with_all_chacha20_implementations
def test_loop(self):
loop = asyncio.get_event_loop()
responder_shaked = asyncio.Event()

Loading…
Cancel
Save