|
|
@ -19,7 +19,10 @@ import time |
|
|
|
import binascii |
|
|
|
import hashlib |
|
|
|
import hmac |
|
|
|
from typing import Sequence |
|
|
|
import cryptography.hazmat.primitives.ciphers.aead as AEAD |
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms |
|
|
|
from cryptography.hazmat.backends import default_backend |
|
|
|
|
|
|
|
from .bitcoin import (public_key_from_private_key, ser_to_point, point_to_ser, |
|
|
|
string_to_number, deserialize_privkey, EC_KEY, rev_hex, int_to_hex, |
|
|
@ -1295,3 +1298,129 @@ class LNPathFinder(PrintError): |
|
|
|
path += [(cur_node, edge_taken)] |
|
|
|
path.reverse() |
|
|
|
return path |
|
|
|
|
|
|
|
|
|
|
|
# bolt 04, "onion" -----> |
|
|
|
|
|
|
|
NUM_MAX_HOPS_IN_PATH = 20 |
|
|
|
HOPS_DATA_SIZE = 1300 # also sometimes called routingInfoSize in bolt-04 |
|
|
|
PER_HOP_FULL_SIZE = 65 # HOPS_DATA_SIZE / 20 |
|
|
|
NUM_STREAM_BYTES = HOPS_DATA_SIZE + PER_HOP_FULL_SIZE |
|
|
|
PER_HOP_PAYLOAD_SIZE = 32 # PER_HOP_FULL_SIZE - len(realm) - len(HMAC) |
|
|
|
PER_HOP_HMAC_SIZE = 32 |
|
|
|
|
|
|
|
|
|
|
|
class OnionPerHop: |
|
|
|
|
|
|
|
def __init__(self, short_channel_id: bytes, amt_to_forward: bytes, outgoing_cltv_value: bytes): |
|
|
|
self.short_channel_id = short_channel_id |
|
|
|
self.amt_to_forward = amt_to_forward |
|
|
|
self.outgoing_cltv_value = outgoing_cltv_value |
|
|
|
|
|
|
|
def to_bytes(self) -> bytes: |
|
|
|
ret = self.short_channel_id |
|
|
|
ret += self.amt_to_forward |
|
|
|
ret += self.outgoing_cltv_value |
|
|
|
ret += bytes(12) # padding |
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
class OnionHopsDataSingle: |
|
|
|
|
|
|
|
def __init__(self, per_hop: OnionPerHop): |
|
|
|
self.realm = 0 |
|
|
|
self.per_hop = per_hop |
|
|
|
self.hmac = None |
|
|
|
|
|
|
|
def to_bytes(self) -> bytes: |
|
|
|
ret = bytes([self.realm]) |
|
|
|
ret += self.per_hop.to_bytes() |
|
|
|
ret += self.hmac if self.hmac is not None else bytes(PER_HOP_HMAC_SIZE) |
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
class OnionPacket: |
|
|
|
|
|
|
|
def __init__(self, public_key: bytes, hops_data: bytes, hmac: bytes): |
|
|
|
self.version = 0 |
|
|
|
self.public_key = public_key |
|
|
|
self.hops_data = hops_data # also called RoutingInfo in bolt-04 |
|
|
|
self.hmac = hmac |
|
|
|
|
|
|
|
def to_bytes(self) -> bytes: |
|
|
|
ret = bytes([self.version]) |
|
|
|
ret += self.public_key |
|
|
|
ret += self.hops_data |
|
|
|
ret += self.hmac |
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
def get_bolt04_onion_key(key_type: bytes, secret: bytes) -> bytes: |
|
|
|
if key_type not in (b'rho', b'mu', b'um'): |
|
|
|
raise Exception('invalid key_type {}'.format(key_type)) |
|
|
|
key = hmac.new(key_type, msg=secret, digestmod=hashlib.sha256).digest() |
|
|
|
return key |
|
|
|
|
|
|
|
|
|
|
|
def new_onion_packet(payment_path_pubkeys: Sequence[bytes], session_key: bytes, |
|
|
|
hops_data: Sequence[OnionHopsDataSingle], associated_data: bytes) -> OnionPacket: |
|
|
|
num_hops = len(payment_path_pubkeys) |
|
|
|
hop_shared_secrets = num_hops * [b''] |
|
|
|
ephemeral_key = session_key |
|
|
|
|
|
|
|
# compute shared key for each hop |
|
|
|
for i in range(0, num_hops): |
|
|
|
hop_shared_secrets[i] = get_ecdh(ephemeral_key, payment_path_pubkeys[i]) |
|
|
|
ephemeral_pubkey = bfh(EC_KEY(ephemeral_key).get_public_key()) |
|
|
|
blinding_factor = H256(ephemeral_pubkey + hop_shared_secrets[i]) |
|
|
|
blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big") |
|
|
|
ephemeral_key_int = int.from_bytes(ephemeral_key, byteorder="big") |
|
|
|
ephemeral_key_int = ephemeral_key_int * blinding_factor_int % SECP256k1.order |
|
|
|
ephemeral_key = ephemeral_key_int.to_bytes(32, byteorder="big") |
|
|
|
|
|
|
|
filler = generate_filler(b'rho', num_hops, PER_HOP_FULL_SIZE, hop_shared_secrets) |
|
|
|
mix_header = bytearray(HOPS_DATA_SIZE) |
|
|
|
next_hmac = bytearray(PER_HOP_HMAC_SIZE) |
|
|
|
|
|
|
|
# compute routing info and MAC for each hop |
|
|
|
for i in range(num_hops-1, -1, -1): |
|
|
|
rho_key = get_bolt04_onion_key(b'rho', hop_shared_secrets[i]) |
|
|
|
mu_key = get_bolt04_onion_key(b'mu', hop_shared_secrets[i]) |
|
|
|
hops_data[i].hmac = next_hmac |
|
|
|
stream_bytes = generate_cipher_stream(rho_key, NUM_STREAM_BYTES) |
|
|
|
mix_header = mix_header[:-PER_HOP_FULL_SIZE] |
|
|
|
mix_header = hops_data[i].to_bytes() + mix_header |
|
|
|
mix_header = ((int.from_bytes(mix_header, "big") ^ int.from_bytes(stream_bytes[:HOPS_DATA_SIZE], "big")) |
|
|
|
.to_bytes(HOPS_DATA_SIZE, "big")) |
|
|
|
if i == num_hops - 1: |
|
|
|
mix_header = mix_header[:-len(filler)] + filler |
|
|
|
packet = mix_header + associated_data |
|
|
|
next_hmac = hmac.new(mu_key, msg=packet, digestmod=hashlib.sha256).digest() |
|
|
|
|
|
|
|
return OnionPacket( |
|
|
|
public_key=bfh(EC_KEY(session_key).get_public_key()), |
|
|
|
hops_data=bytes(mix_header), |
|
|
|
hmac=next_hmac) |
|
|
|
|
|
|
|
|
|
|
|
def generate_filler(key_type: bytes, num_hops: int, hop_size: int, |
|
|
|
shared_secrets: Sequence[bytes]) -> bytes: |
|
|
|
filler_size = (NUM_MAX_HOPS_IN_PATH + 1) * hop_size |
|
|
|
filler = bytearray(filler_size) |
|
|
|
|
|
|
|
for i in range(0, num_hops-1): # -1, as last hop does not obfuscate |
|
|
|
filler = filler[hop_size:] |
|
|
|
filler += bytearray(hop_size) |
|
|
|
stream_key = get_bolt04_onion_key(key_type, shared_secrets[i]) |
|
|
|
stream_bytes = generate_cipher_stream(stream_key, filler_size) |
|
|
|
filler = ((int.from_bytes(filler, "big") ^ int.from_bytes(stream_bytes, "big")) |
|
|
|
.to_bytes(filler_size, "big")) |
|
|
|
|
|
|
|
return filler[(NUM_MAX_HOPS_IN_PATH-num_hops+2)*hop_size:] |
|
|
|
|
|
|
|
|
|
|
|
def generate_cipher_stream(stream_key: bytes, num_bytes: int) -> bytes: |
|
|
|
algo = algorithms.ChaCha20(stream_key, nonce=bytes(16)) |
|
|
|
cipher = Cipher(algo, mode=None, backend=default_backend()) |
|
|
|
encryptor = cipher.encryptor() |
|
|
|
return encryptor.update(bytes(num_bytes)) |
|
|
|