|
|
@ -2,14 +2,32 @@ |
|
|
|
|
|
|
|
import bitstring |
|
|
|
import re |
|
|
|
import hashlib |
|
|
|
from typing import List, NamedTuple |
|
|
|
from bech32 import bech32_decode, CHARSET |
|
|
|
from ecdsa import SECP256k1, VerifyingKey |
|
|
|
from ecdsa.util import sigdecode_string |
|
|
|
from binascii import unhexlify |
|
|
|
|
|
|
|
|
|
|
|
class Route(NamedTuple): |
|
|
|
pubkey: str |
|
|
|
short_channel_id: str |
|
|
|
base_fee_msat: int |
|
|
|
ppm_fee: int |
|
|
|
cltv: int |
|
|
|
|
|
|
|
|
|
|
|
class Invoice(object): |
|
|
|
def __init__(self): |
|
|
|
self.payment_hash: str = None |
|
|
|
self.amount_msat: int = 0 |
|
|
|
self.description: str = None |
|
|
|
payment_hash: str = None |
|
|
|
amount_msat: int = 0 |
|
|
|
description: str = None |
|
|
|
payee: str = None |
|
|
|
date: int = None |
|
|
|
expiry: int = 3600 |
|
|
|
secret: str = None |
|
|
|
route_hints: List[Route] = [] |
|
|
|
min_final_cltv_expiry: int = 18 |
|
|
|
|
|
|
|
|
|
|
|
def decode(pr: str) -> Invoice: |
|
|
@ -26,13 +44,20 @@ def decode(pr: str) -> Invoice: |
|
|
|
|
|
|
|
data = u5_to_bitarray(data) |
|
|
|
|
|
|
|
# Final signature 65 bytes, split it off. |
|
|
|
# final signature 65 bytes, split it off. |
|
|
|
if len(data) < 65 * 8: |
|
|
|
raise ValueError("Too short to contain signature") |
|
|
|
|
|
|
|
# extract the signature |
|
|
|
signature = data[-65 * 8 :].tobytes() |
|
|
|
|
|
|
|
# the tagged fields as a bitstream |
|
|
|
data = bitstring.ConstBitStream(data[: -65 * 8]) |
|
|
|
|
|
|
|
# build the invoice object |
|
|
|
invoice = Invoice() |
|
|
|
|
|
|
|
# decode the amount from the hrp |
|
|
|
m = re.search("[^\d]+", hrp[2:]) |
|
|
|
if m: |
|
|
|
amountstr = hrp[2 + m.end() :] |
|
|
@ -40,11 +65,10 @@ def decode(pr: str) -> Invoice: |
|
|
|
invoice.amount_msat = unshorten_amount(amountstr) |
|
|
|
|
|
|
|
# pull out date |
|
|
|
data.read(35).uint |
|
|
|
invoice.date = data.read(35).uint |
|
|
|
|
|
|
|
while data.pos != data.len: |
|
|
|
tag, tagdata, data = pull_tagged(data) |
|
|
|
|
|
|
|
data_length = len(tagdata) / 5 |
|
|
|
|
|
|
|
if tag == "d": |
|
|
@ -53,6 +77,41 @@ def decode(pr: str) -> Invoice: |
|
|
|
invoice.description = trim_to_bytes(tagdata).hex() |
|
|
|
elif tag == "p" and data_length == 52: |
|
|
|
invoice.payment_hash = trim_to_bytes(tagdata).hex() |
|
|
|
elif tag == "x": |
|
|
|
invoice.expiry = tagdata.uint |
|
|
|
elif tag == "n": |
|
|
|
invoice.payee = trim_to_bytes(tagdata).hex() |
|
|
|
# this won't work in most cases, we must extract the payee |
|
|
|
# from the signature |
|
|
|
elif tag == "s": |
|
|
|
invoice.secret = trim_to_bytes(tagdata).hex() |
|
|
|
elif tag == "r": |
|
|
|
s = bitstring.ConstBitStream(tagdata) |
|
|
|
while s.pos + 264 + 64 + 32 + 32 + 16 < s.len: |
|
|
|
route = Route( |
|
|
|
pubkey=s.read(264).tobytes().hex(), |
|
|
|
short_channel_id=readable_scid(s.read(64).intbe), |
|
|
|
base_fee_msat=s.read(32).intbe, |
|
|
|
ppm_fee=s.read(32).intbe, |
|
|
|
cltv=s.read(16).intbe, |
|
|
|
) |
|
|
|
invoice.route_hints.append(route) |
|
|
|
|
|
|
|
# BOLT #11: |
|
|
|
# A reader MUST check that the `signature` is valid (see the `n` tagged |
|
|
|
# field specified below). |
|
|
|
# A reader MUST use the `n` field to validate the signature instead of |
|
|
|
# performing signature recovery if a valid `n` field is provided. |
|
|
|
message = bytearray([ord(c) for c in hrp]) + data.tobytes() |
|
|
|
sig = signature[0:64] |
|
|
|
if invoice.payee: |
|
|
|
key = VerifyingKey.from_string(unhexlify(invoice.payee), curve=SECP256k1) |
|
|
|
key.verify(sig, message, hashlib.sha256, sigdecode=sigdecode_string) |
|
|
|
else: |
|
|
|
keys = VerifyingKey.from_public_key_recovery(sig, message, SECP256k1, hashlib.sha256) |
|
|
|
signaling_byte = signature[64] |
|
|
|
key = keys[int(signaling_byte)] |
|
|
|
invoice.payee = key.to_string("compressed").hex() |
|
|
|
|
|
|
|
return invoice |
|
|
|
|
|
|
@ -101,6 +160,14 @@ def trim_to_bytes(barr): |
|
|
|
return b |
|
|
|
|
|
|
|
|
|
|
|
def readable_scid(short_channel_id: int) -> str: |
|
|
|
return "{blockheight}x{transactionindex}x{outputindex}".format( |
|
|
|
blockheight=((short_channel_id >> 40) & 0xFFFFFF), |
|
|
|
transactionindex=((short_channel_id >> 16) & 0xFFFFFF), |
|
|
|
outputindex=(short_channel_id & 0xFFFF), |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def u5_to_bitarray(arr): |
|
|
|
ret = bitstring.BitArray() |
|
|
|
for a in arr: |
|
|
|