|
|
@ -18,7 +18,7 @@ import time |
|
|
|
import binascii |
|
|
|
import hashlib |
|
|
|
import hmac |
|
|
|
from typing import Sequence |
|
|
|
from typing import Sequence, Union |
|
|
|
import cryptography.hazmat.primitives.ciphers.aead as AEAD |
|
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms |
|
|
|
from cryptography.hazmat.backends import default_backend |
|
|
@ -1617,18 +1617,17 @@ class OnionPacket: |
|
|
|
|
|
|
|
|
|
|
|
def get_bolt04_onion_key(key_type: bytes, secret: bytes) -> bytes: |
|
|
|
if key_type not in (b'rho', b'mu', b'um'): |
|
|
|
if key_type not in (b'rho', b'mu', b'um', b'ammag'): |
|
|
|
raise Exception('invalid key_type {}'.format(key_type)) |
|
|
|
key = hmac.new(key_type, msg=secret, digestmod=hashlib.sha256).digest() |
|
|
|
return key |
|
|
|
|
|
|
|
|
|
|
|
def new_onion_packet(payment_path_pubkeys: Sequence[bytes], session_key: bytes, |
|
|
|
hops_data: Sequence[OnionHopsDataSingle], associated_data: bytes) -> OnionPacket: |
|
|
|
def get_shared_secrets_along_route(payment_path_pubkeys: Sequence[bytes], |
|
|
|
session_key: bytes) -> Sequence[bytes]: |
|
|
|
num_hops = len(payment_path_pubkeys) |
|
|
|
hop_shared_secrets = num_hops * [b''] |
|
|
|
ephemeral_key = session_key |
|
|
|
|
|
|
|
# compute shared key for each hop |
|
|
|
for i in range(0, num_hops): |
|
|
|
hop_shared_secrets[i] = get_ecdh(ephemeral_key, payment_path_pubkeys[i]) |
|
|
@ -1638,6 +1637,13 @@ def new_onion_packet(payment_path_pubkeys: Sequence[bytes], session_key: bytes, |
|
|
|
ephemeral_key_int = int.from_bytes(ephemeral_key, byteorder="big") |
|
|
|
ephemeral_key_int = ephemeral_key_int * blinding_factor_int % SECP256k1.order |
|
|
|
ephemeral_key = ephemeral_key_int.to_bytes(32, byteorder="big") |
|
|
|
return hop_shared_secrets |
|
|
|
|
|
|
|
|
|
|
|
def new_onion_packet(payment_path_pubkeys: Sequence[bytes], session_key: bytes, |
|
|
|
hops_data: Sequence[OnionHopsDataSingle], associated_data: bytes) -> OnionPacket: |
|
|
|
num_hops = len(payment_path_pubkeys) |
|
|
|
hop_shared_secrets = get_shared_secrets_along_route(payment_path_pubkeys, session_key) |
|
|
|
|
|
|
|
filler = generate_filler(b'rho', num_hops, PER_HOP_FULL_SIZE, hop_shared_secrets) |
|
|
|
mix_header = bytes(HOPS_DATA_SIZE) |
|
|
@ -1726,6 +1732,55 @@ def process_onion_packet(onion_packet: OnionPacket, associated_data: bytes, |
|
|
|
are_we_final = False |
|
|
|
return ProcessedOnionPacket(are_we_final, hop_data, next_onion_packet) |
|
|
|
|
|
|
|
|
|
|
|
class FailedToDecodeOnionError(Exception): pass |
|
|
|
|
|
|
|
|
|
|
|
class OnionRoutingFailureMessage: |
|
|
|
|
|
|
|
def __init__(self, code: int, data: bytes): |
|
|
|
self.code = code |
|
|
|
self.data = data |
|
|
|
|
|
|
|
|
|
|
|
def _decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes], |
|
|
|
session_key: bytes) -> (bytes, int): |
|
|
|
"""Returns the decoded error bytes, and the index of the sender of the error.""" |
|
|
|
num_hops = len(payment_path_pubkeys) |
|
|
|
hop_shared_secrets = get_shared_secrets_along_route(payment_path_pubkeys, session_key) |
|
|
|
for i in range(num_hops): |
|
|
|
ammag_key = get_bolt04_onion_key(b'ammag', hop_shared_secrets[i]) |
|
|
|
um_key = get_bolt04_onion_key(b'um', hop_shared_secrets[i]) |
|
|
|
stream_bytes = generate_cipher_stream(ammag_key, len(error_packet)) |
|
|
|
error_packet = xor_bytes(error_packet, stream_bytes) |
|
|
|
hmac_computed = hmac.new(um_key, msg=error_packet[32:], digestmod=hashlib.sha256).digest() |
|
|
|
hmac_found = error_packet[:32] |
|
|
|
if hmac_computed == hmac_found: |
|
|
|
return error_packet, i |
|
|
|
raise FailedToDecodeOnionError() |
|
|
|
|
|
|
|
|
|
|
|
def decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes], |
|
|
|
session_key: bytes) -> (OnionRoutingFailureMessage, int): |
|
|
|
"""Returns the failure message, and the index of the sender of the error.""" |
|
|
|
decrypted_error, sender_index = _decode_onion_error(error_packet, payment_path_pubkeys, session_key) |
|
|
|
failure_msg = get_failure_msg_from_onion_error(decrypted_error) |
|
|
|
return failure_msg, sender_index |
|
|
|
|
|
|
|
|
|
|
|
def get_failure_msg_from_onion_error(decrypted_error_packet: bytes) -> OnionRoutingFailureMessage: |
|
|
|
# get failure_msg bytes from error packet |
|
|
|
failure_len = int.from_bytes(decrypted_error_packet[32:34], byteorder='big') |
|
|
|
failure_msg = decrypted_error_packet[34:34+failure_len] |
|
|
|
# create failure message object |
|
|
|
failure_code = int.from_bytes(failure_msg[:2], byteorder='big') |
|
|
|
failure_data = failure_msg[2:] |
|
|
|
return OnionRoutingFailureMessage(failure_code, failure_data) |
|
|
|
|
|
|
|
|
|
|
|
# <----- bolt 04, "onion" |
|
|
|
|
|
|
|
|
|
|
|
def count_trailing_zeros(index): |
|
|
|
""" BOLT-03 (where_to_put_secret) """ |
|
|
|
try: |
|
|
|