|
|
@ -1,12 +1,10 @@ |
|
|
|
# type: ignore |
|
|
|
|
|
|
|
import bitstring |
|
|
|
import bitstring # type: ignore |
|
|
|
import re |
|
|
|
import hashlib |
|
|
|
from typing import List, NamedTuple |
|
|
|
from typing import List, NamedTuple, Optional |
|
|
|
from bech32 import bech32_decode, CHARSET |
|
|
|
from ecdsa import SECP256k1, VerifyingKey |
|
|
|
from ecdsa.util import sigdecode_string |
|
|
|
from ecdsa import SECP256k1, VerifyingKey # type: ignore |
|
|
|
from ecdsa.util import sigdecode_string # type: ignore |
|
|
|
from binascii import unhexlify |
|
|
|
|
|
|
|
|
|
|
@ -19,40 +17,40 @@ class Route(NamedTuple): |
|
|
|
|
|
|
|
|
|
|
|
class Invoice(object): |
|
|
|
payment_hash: str = None |
|
|
|
payment_hash: str |
|
|
|
amount_msat: int = 0 |
|
|
|
description: str = None |
|
|
|
payee: str = None |
|
|
|
date: int = None |
|
|
|
description: Optional[str] = None |
|
|
|
description_hash: Optional[str] = None |
|
|
|
payee: str |
|
|
|
date: int |
|
|
|
expiry: int = 3600 |
|
|
|
secret: str = None |
|
|
|
secret: Optional[str] = None |
|
|
|
route_hints: List[Route] = [] |
|
|
|
min_final_cltv_expiry: int = 18 |
|
|
|
|
|
|
|
|
|
|
|
def decode(pr: str) -> Invoice: |
|
|
|
""" Super naïve bolt11 decoder, |
|
|
|
only gets payment_hash, description/description_hash and amount in msatoshi. |
|
|
|
"""bolt11 decoder, |
|
|
|
based on https://github.com/rustyrussell/lightning-payencode/blob/master/lnaddr.py |
|
|
|
""" |
|
|
|
hrp, data = bech32_decode(pr) |
|
|
|
if not hrp: |
|
|
|
raise ValueError("Bad bech32 checksum") |
|
|
|
|
|
|
|
hrp, decoded_data = bech32_decode(pr) |
|
|
|
if hrp is None or decoded_data is None: |
|
|
|
raise ValueError("Bad bech32 checksum") |
|
|
|
if not hrp.startswith("ln"): |
|
|
|
raise ValueError("Does not start with ln") |
|
|
|
|
|
|
|
data = u5_to_bitarray(data) |
|
|
|
bitarray = _u5_to_bitarray(decoded_data) |
|
|
|
|
|
|
|
# final signature 65 bytes, split it off. |
|
|
|
if len(data) < 65 * 8: |
|
|
|
if len(bitarray) < 65 * 8: |
|
|
|
raise ValueError("Too short to contain signature") |
|
|
|
|
|
|
|
# extract the signature |
|
|
|
signature = data[-65 * 8 :].tobytes() |
|
|
|
signature = bitarray[-65 * 8 :].tobytes() |
|
|
|
|
|
|
|
# the tagged fields as a bitstream |
|
|
|
data = bitstring.ConstBitStream(data[: -65 * 8]) |
|
|
|
data = bitstring.ConstBitStream(bitarray[: -65 * 8]) |
|
|
|
|
|
|
|
# build the invoice object |
|
|
|
invoice = Invoice() |
|
|
@ -62,35 +60,35 @@ def decode(pr: str) -> Invoice: |
|
|
|
if m: |
|
|
|
amountstr = hrp[2 + m.end() :] |
|
|
|
if amountstr != "": |
|
|
|
invoice.amount_msat = unshorten_amount(amountstr) |
|
|
|
invoice.amount_msat = _unshorten_amount(amountstr) |
|
|
|
|
|
|
|
# pull out date |
|
|
|
invoice.date = data.read(35).uint |
|
|
|
|
|
|
|
while data.pos != data.len: |
|
|
|
tag, tagdata, data = pull_tagged(data) |
|
|
|
tag, tagdata, data = _pull_tagged(data) |
|
|
|
data_length = len(tagdata) / 5 |
|
|
|
|
|
|
|
if tag == "d": |
|
|
|
invoice.description = trim_to_bytes(tagdata).decode("utf-8") |
|
|
|
invoice.description = _trim_to_bytes(tagdata).decode("utf-8") |
|
|
|
elif tag == "h" and data_length == 52: |
|
|
|
invoice.description = trim_to_bytes(tagdata).hex() |
|
|
|
invoice.description_hash = _trim_to_bytes(tagdata).hex() |
|
|
|
elif tag == "p" and data_length == 52: |
|
|
|
invoice.payment_hash = trim_to_bytes(tagdata).hex() |
|
|
|
invoice.payment_hash = _trim_to_bytes(tagdata).hex() |
|
|
|
elif tag == "x": |
|
|
|
invoice.expiry = tagdata.uint |
|
|
|
elif tag == "n": |
|
|
|
invoice.payee = trim_to_bytes(tagdata).hex() |
|
|
|
invoice.payee = _trim_to_bytes(tagdata).hex() |
|
|
|
# this won't work in most cases, we must extract the payee |
|
|
|
# from the signature |
|
|
|
elif tag == "s": |
|
|
|
invoice.secret = trim_to_bytes(tagdata).hex() |
|
|
|
invoice.secret = _trim_to_bytes(tagdata).hex() |
|
|
|
elif tag == "r": |
|
|
|
s = bitstring.ConstBitStream(tagdata) |
|
|
|
while s.pos + 264 + 64 + 32 + 32 + 16 < s.len: |
|
|
|
route = Route( |
|
|
|
pubkey=s.read(264).tobytes().hex(), |
|
|
|
short_channel_id=readable_scid(s.read(64).intbe), |
|
|
|
short_channel_id=_readable_scid(s.read(64).intbe), |
|
|
|
base_fee_msat=s.read(32).intbe, |
|
|
|
ppm_fee=s.read(32).intbe, |
|
|
|
cltv=s.read(16).intbe, |
|
|
@ -116,7 +114,7 @@ def decode(pr: str) -> Invoice: |
|
|
|
return invoice |
|
|
|
|
|
|
|
|
|
|
|
def unshorten_amount(amount: str) -> int: |
|
|
|
def _unshorten_amount(amount: str) -> int: |
|
|
|
""" Given a shortened amount, return millisatoshis |
|
|
|
""" |
|
|
|
# BOLT #11: |
|
|
@ -141,18 +139,18 @@ def unshorten_amount(amount: str) -> int: |
|
|
|
raise ValueError("Invalid amount '{}'".format(amount)) |
|
|
|
|
|
|
|
if unit in units: |
|
|
|
return int(amount[:-1]) * 100_000_000_000 / units[unit] |
|
|
|
return int(int(amount[:-1]) * 100_000_000_000 / units[unit]) |
|
|
|
else: |
|
|
|
return int(amount) * 100_000_000_000 |
|
|
|
|
|
|
|
|
|
|
|
def pull_tagged(stream): |
|
|
|
def _pull_tagged(stream): |
|
|
|
tag = stream.read(5).uint |
|
|
|
length = stream.read(5).uint * 32 + stream.read(5).uint |
|
|
|
return (CHARSET[tag], stream.read(length * 5), stream) |
|
|
|
|
|
|
|
|
|
|
|
def trim_to_bytes(barr): |
|
|
|
def _trim_to_bytes(barr): |
|
|
|
# Adds a byte if necessary. |
|
|
|
b = barr.tobytes() |
|
|
|
if barr.len % 8 != 0: |
|
|
@ -160,7 +158,7 @@ def trim_to_bytes(barr): |
|
|
|
return b |
|
|
|
|
|
|
|
|
|
|
|
def readable_scid(short_channel_id: int) -> str: |
|
|
|
def _readable_scid(short_channel_id: int) -> str: |
|
|
|
return "{blockheight}x{transactionindex}x{outputindex}".format( |
|
|
|
blockheight=((short_channel_id >> 40) & 0xFFFFFF), |
|
|
|
transactionindex=((short_channel_id >> 16) & 0xFFFFFF), |
|
|
@ -168,7 +166,7 @@ def readable_scid(short_channel_id: int) -> str: |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def u5_to_bitarray(arr): |
|
|
|
def _u5_to_bitarray(arr: List[int]) -> bitstring.BitArray: |
|
|
|
ret = bitstring.BitArray() |
|
|
|
for a in arr: |
|
|
|
ret += bitstring.pack("uint:5", a) |
|
|
|