from binascii import hexlify from cryptography.exceptions import InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives import serialization from hashlib import sha256 import coincurve import os import socket import struct import threading __all__ = [ 'PrivateKey', 'PublicKey', 'Secret', 'LightningConnection', 'LightningServerSocket', 'connect' ] def hkdf(ikm, salt=b"", info=b""): hkdf = HKDF( algorithm=hashes.SHA256(), length=64, salt=salt, info=info, backend=default_backend()) return hkdf.derive(ikm) def hkdf_two_keys(ikm, salt): t = hkdf(ikm, salt) return t[:32], t[32:] def ecdh(k, rk): k = coincurve.PrivateKey(secret=k.rawkey) rk = coincurve.PublicKey(data=rk.serializeCompressed()) a = k.ecdh(rk.public_key) return Secret(a) def encryptWithAD(k, n, ad, plaintext): chacha = ChaCha20Poly1305(k) return chacha.encrypt(n, plaintext, ad) def decryptWithAD(k, n, ad, ciphertext): chacha = ChaCha20Poly1305(k) return chacha.decrypt(n, ciphertext, ad) class PrivateKey(object): def __init__(self, rawkey): assert len(rawkey) == 32 and isinstance(rawkey, bytes) self.rawkey = rawkey rawkey = int(hexlify(rawkey), base=16) self.key = ec.derive_private_key(rawkey, ec.SECP256K1(), default_backend()) def serializeCompressed(self): return self.key.private_bytes(serialization.Encoding.Raw, serialization.PrivateFormat.Raw, None) def public_key(self): return PublicKey(self.key.public_key()) class Secret(object): def __init__(self, raw): assert(len(raw) == 32) self.raw = raw def __str__(self): return "Secret[0x{}]".format(hexlify(self.raw).decode('ASCII')) class PublicKey(object): def __init__(self, innerkey): # We accept either 33-bytes raw keys, or an EC PublicKey as returned # by cryptography.io if isinstance(innerkey, bytes): innerkey = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256K1(), innerkey ) elif not isinstance(innerkey, ec.EllipticCurvePublicKey): raise ValueError( "Key must either be bytes or ec.EllipticCurvePublicKey" ) self.key = innerkey def serializeCompressed(self): raw = self.key.public_bytes( serialization.Encoding.X962, serialization.PublicFormat.CompressedPoint ) return raw def __str__(self): return "PublicKey[0x{}]".format( hexlify(self.serializeCompressed()).decode('ASCII') ) def Keypair(object): def __init__(self, priv, pub): self.priv, self.pub = priv, pub class Sha256Mixer(object): def __init__(self, base): self.hash = sha256(base).digest() def update(self, data): h = sha256(self.hash) h.update(data) self.hash = h.digest() return self.hash def digest(self): return self.hash def __str__(self): return "Sha256Mixer[0x{}]".format(hexlify(self.hash).decode('ASCII')) class LightningConnection(object): def __init__(self, connection, remote_pubkey, local_privkey, is_initiator): self.connection = connection self.chaining_key = None self.handshake_hash = None self.local_privkey = local_privkey self.local_pubkey = self.local_privkey.public_key() self.remote_pubkey = remote_pubkey self.is_initiator = is_initiator self.init_handshake() self.rn, self.sn = 0, 0 self.send_lock, self.recv_lock = threading.Lock(), threading.Lock() @classmethod def nonce(cls, n): """Transforms a numeric nonce into a byte formatted one Nonce n encoded as 32 zero bits, followed by a little-endian 64-bit value. Note: this follows the Noise Protocol convention, rather than our normal endian. """ return b'\x00' * 4 + struct.pack("