|
|
@ -6,7 +6,7 @@ import time |
|
|
|
from hashlib import sha256 |
|
|
|
from binascii import hexlify |
|
|
|
from decimal import Decimal |
|
|
|
from typing import Optional, TYPE_CHECKING |
|
|
|
from typing import Optional, TYPE_CHECKING, Type |
|
|
|
|
|
|
|
import random |
|
|
|
import bitstring |
|
|
@ -15,6 +15,7 @@ from .bitcoin import hash160_to_b58_address, b58_address_to_hash160, TOTAL_COIN_ |
|
|
|
from .segwit_addr import bech32_encode, bech32_decode, CHARSET |
|
|
|
from . import segwit_addr |
|
|
|
from . import constants |
|
|
|
from .constants import AbstractNet |
|
|
|
from . import ecc |
|
|
|
from .bitcoin import COIN |
|
|
|
|
|
|
@ -83,57 +84,42 @@ def bitarray_to_u5(barr): |
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
def encode_fallback(fallback: str, currency): |
|
|
|
def encode_fallback(fallback: str, net: Type[AbstractNet]): |
|
|
|
""" Encode all supported fallback addresses. |
|
|
|
""" |
|
|
|
if currency in [constants.BitcoinMainnet.SEGWIT_HRP, constants.BitcoinTestnet.SEGWIT_HRP]: |
|
|
|
wver, wprog_ints = segwit_addr.decode_segwit_address(currency, fallback) |
|
|
|
if wver is not None: |
|
|
|
wprog = bytes(wprog_ints) |
|
|
|
else: |
|
|
|
addrtype, addr = b58_address_to_hash160(fallback) |
|
|
|
if is_p2pkh(currency, addrtype): |
|
|
|
wver = 17 |
|
|
|
elif is_p2sh(currency, addrtype): |
|
|
|
wver = 18 |
|
|
|
else: |
|
|
|
raise ValueError("Unknown address type for {}".format(currency)) |
|
|
|
wprog = addr |
|
|
|
return tagged('f', bitstring.pack("uint:5", wver) + wprog) |
|
|
|
wver, wprog_ints = segwit_addr.decode_segwit_address(net.SEGWIT_HRP, fallback) |
|
|
|
if wver is not None: |
|
|
|
wprog = bytes(wprog_ints) |
|
|
|
else: |
|
|
|
raise NotImplementedError("Support for currency {} not implemented".format(currency)) |
|
|
|
|
|
|
|
|
|
|
|
def parse_fallback(fallback, currency): |
|
|
|
if currency in [constants.BitcoinMainnet.SEGWIT_HRP, constants.BitcoinTestnet.SEGWIT_HRP]: |
|
|
|
wver = fallback[0:5].uint |
|
|
|
if wver == 17: |
|
|
|
addr=hash160_to_b58_address(fallback[5:].tobytes(), base58_prefix_map[currency][0]) |
|
|
|
elif wver == 18: |
|
|
|
addr=hash160_to_b58_address(fallback[5:].tobytes(), base58_prefix_map[currency][1]) |
|
|
|
elif wver <= 16: |
|
|
|
witprog = fallback[5:] # cut witver |
|
|
|
witprog = witprog[:len(witprog) // 8 * 8] # can only be full bytes |
|
|
|
witprog = witprog.tobytes() |
|
|
|
addr = segwit_addr.encode_segwit_address(currency, wver, witprog) |
|
|
|
addrtype, addr = b58_address_to_hash160(fallback) |
|
|
|
if addrtype == net.ADDRTYPE_P2PKH: |
|
|
|
wver = 17 |
|
|
|
elif addrtype == net.ADDRTYPE_P2SH: |
|
|
|
wver = 18 |
|
|
|
else: |
|
|
|
return None |
|
|
|
raise ValueError(f"Unknown address type {addrtype} for {net}") |
|
|
|
wprog = addr |
|
|
|
return tagged('f', bitstring.pack("uint:5", wver) + wprog) |
|
|
|
|
|
|
|
|
|
|
|
def parse_fallback(fallback, net: Type[AbstractNet]): |
|
|
|
wver = fallback[0:5].uint |
|
|
|
if wver == 17: |
|
|
|
addr = hash160_to_b58_address(fallback[5:].tobytes(), net.ADDRTYPE_P2PKH) |
|
|
|
elif wver == 18: |
|
|
|
addr = hash160_to_b58_address(fallback[5:].tobytes(), net.ADDRTYPE_P2SH) |
|
|
|
elif wver <= 16: |
|
|
|
witprog = fallback[5:] # cut witver |
|
|
|
witprog = witprog[:len(witprog) // 8 * 8] # can only be full bytes |
|
|
|
witprog = witprog.tobytes() |
|
|
|
addr = segwit_addr.encode_segwit_address(net.SEGWIT_HRP, wver, witprog) |
|
|
|
else: |
|
|
|
addr=fallback.tobytes() |
|
|
|
return None |
|
|
|
return addr |
|
|
|
|
|
|
|
|
|
|
|
# Map of classical and witness address prefixes |
|
|
|
base58_prefix_map = { |
|
|
|
constants.BitcoinMainnet.SEGWIT_HRP : (constants.BitcoinMainnet.ADDRTYPE_P2PKH, constants.BitcoinMainnet.ADDRTYPE_P2SH), |
|
|
|
constants.BitcoinTestnet.SEGWIT_HRP : (constants.BitcoinTestnet.ADDRTYPE_P2PKH, constants.BitcoinTestnet.ADDRTYPE_P2SH) |
|
|
|
} |
|
|
|
|
|
|
|
def is_p2pkh(currency, prefix): |
|
|
|
return prefix == base58_prefix_map[currency][0] |
|
|
|
BOLT11_HRP_INV_DICT = {net.BOLT11_HRP: net for net in constants.NETS_LIST} |
|
|
|
|
|
|
|
def is_p2sh(currency, prefix): |
|
|
|
return prefix == base58_prefix_map[currency][1] |
|
|
|
|
|
|
|
# Tagged field containing BitArray |
|
|
|
def tagged(char, l): |
|
|
@ -178,13 +164,10 @@ def pull_tagged(stream): |
|
|
|
return (CHARSET[tag], stream.read(length * 5), stream) |
|
|
|
|
|
|
|
def lnencode(addr: 'LnAddr', privkey) -> str: |
|
|
|
# see https://github.com/lightningnetwork/lightning-rfc/pull/844 |
|
|
|
if constants.net.NET_NAME == "signet": |
|
|
|
addr.currency = constants.net.BOLT11_HRP |
|
|
|
if addr.amount: |
|
|
|
amount = addr.currency + shorten_amount(addr.amount) |
|
|
|
amount = addr.net.BOLT11_HRP + shorten_amount(addr.amount) |
|
|
|
else: |
|
|
|
amount = addr.currency if addr.currency else '' |
|
|
|
amount = addr.net.BOLT11_HRP if addr.net else '' |
|
|
|
|
|
|
|
hrp = 'ln' + amount |
|
|
|
|
|
|
@ -221,7 +204,7 @@ def lnencode(addr: 'LnAddr', privkey) -> str: |
|
|
|
route = bitstring.BitArray(pubkey) + bitstring.pack('intbe:32', feebase) + bitstring.pack('intbe:32', feerate) + bitstring.pack('intbe:16', cltv) |
|
|
|
data += tagged('t', route) |
|
|
|
elif k == 'f': |
|
|
|
data += encode_fallback(v, addr.currency) |
|
|
|
data += encode_fallback(v, addr.net) |
|
|
|
elif k == 'd': |
|
|
|
# truncate to max length: 1024*5 bits = 639 bytes |
|
|
|
data += tagged_bytes('d', v.encode()[0:639]) |
|
|
@ -270,7 +253,7 @@ def lnencode(addr: 'LnAddr', privkey) -> str: |
|
|
|
|
|
|
|
|
|
|
|
class LnAddr(object): |
|
|
|
def __init__(self, *, paymenthash: bytes = None, amount=None, currency=None, tags=None, date=None, |
|
|
|
def __init__(self, *, paymenthash: bytes = None, amount=None, net: Type[AbstractNet] = None, tags=None, date=None, |
|
|
|
payment_secret: bytes = None): |
|
|
|
self.date = int(time.time()) if not date else int(date) |
|
|
|
self.tags = [] if not tags else tags |
|
|
@ -279,7 +262,7 @@ class LnAddr(object): |
|
|
|
self.payment_secret = payment_secret |
|
|
|
self.signature = None |
|
|
|
self.pubkey = None |
|
|
|
self.currency = constants.net.SEGWIT_HRP if currency is None else currency |
|
|
|
self.net = constants.net if net is None else net # type: Type[AbstractNet] |
|
|
|
self._amount = amount # type: Optional[Decimal] # in bitcoins |
|
|
|
self._min_final_cltv_expiry = 18 |
|
|
|
|
|
|
@ -330,7 +313,7 @@ class LnAddr(object): |
|
|
|
def __str__(self): |
|
|
|
return "LnAddr[{}, amount={}{} tags=[{}]]".format( |
|
|
|
hexlify(self.pubkey.serialize()).decode('utf-8') if self.pubkey else None, |
|
|
|
self.amount, self.currency, |
|
|
|
self.amount, self.net.BOLT11_HRP, |
|
|
|
", ".join([k + '=' + str(v) for k, v in self.tags]) |
|
|
|
) |
|
|
|
|
|
|
@ -367,12 +350,9 @@ class SerializableKey: |
|
|
|
def serialize(self): |
|
|
|
return self.pubkey.get_public_key_bytes(True) |
|
|
|
|
|
|
|
def lndecode(invoice: str, *, verbose=False, expected_hrp=None) -> LnAddr: |
|
|
|
if expected_hrp is None: |
|
|
|
expected_hrp = constants.net.SEGWIT_HRP |
|
|
|
# see https://github.com/lightningnetwork/lightning-rfc/pull/844 |
|
|
|
if constants.net.NET_NAME == "signet": |
|
|
|
expected_hrp = constants.net.BOLT11_HRP |
|
|
|
def lndecode(invoice: str, *, verbose=False, net=None) -> LnAddr: |
|
|
|
if net is None: |
|
|
|
net = constants.net |
|
|
|
decoded_bech32 = bech32_decode(invoice, ignore_long_length=True) |
|
|
|
hrp = decoded_bech32.hrp |
|
|
|
data = decoded_bech32.data |
|
|
@ -387,8 +367,8 @@ def lndecode(invoice: str, *, verbose=False, expected_hrp=None) -> LnAddr: |
|
|
|
if not hrp.startswith('ln'): |
|
|
|
raise ValueError("Does not start with ln") |
|
|
|
|
|
|
|
if not hrp[2:].startswith(expected_hrp): |
|
|
|
raise ValueError("Wrong Lightning invoice HRP " + hrp[2:] + ", should be " + expected_hrp) |
|
|
|
if not hrp[2:].startswith(net.BOLT11_HRP): |
|
|
|
raise ValueError(f"Wrong Lightning invoice HRP {hrp[2:]}, should be {net.BOLT11_HRP}") |
|
|
|
|
|
|
|
data = u5_to_bitarray(data) |
|
|
|
|
|
|
@ -403,7 +383,7 @@ def lndecode(invoice: str, *, verbose=False, expected_hrp=None) -> LnAddr: |
|
|
|
|
|
|
|
m = re.search("[^\\d]+", hrp[2:]) |
|
|
|
if m: |
|
|
|
addr.currency = m.group(0) |
|
|
|
addr.net = BOLT11_HRP_INV_DICT[m.group(0)] |
|
|
|
amountstr = hrp[2+m.end():] |
|
|
|
# BOLT #11: |
|
|
|
# |
|
|
@ -453,7 +433,7 @@ def lndecode(invoice: str, *, verbose=False, expected_hrp=None) -> LnAddr: |
|
|
|
s.read(16).uintbe) |
|
|
|
addr.tags.append(('t', e)) |
|
|
|
elif tag == 'f': |
|
|
|
fallback = parse_fallback(tagdata, addr.currency) |
|
|
|
fallback = parse_fallback(tagdata, addr.net) |
|
|
|
if fallback: |
|
|
|
addr.tags.append(('f', fallback)) |
|
|
|
else: |
|
|
@ -532,13 +512,3 @@ def lndecode(invoice: str, *, verbose=False, expected_hrp=None) -> LnAddr: |
|
|
|
addr.pubkey = SerializableKey(ecc.ECPubkey.from_sig_string(sigdecoded[:64], sigdecoded[64], hrp_hash)) |
|
|
|
|
|
|
|
return addr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
# run using |
|
|
|
# python3 -m electrum.lnaddr <invoice> <expected hrp> |
|
|
|
# python3 -m electrum.lnaddr lntb1n1pdlcakepp5e7rn0knl0gm46qqp9eqdsza2c942d8pjqnwa5903n39zu28sgk3sdq423jhxapqv3hkuct5d9hkucqp2rzjqwyx8nu2hygyvgc02cwdtvuxe0lcxz06qt3lpsldzcdr46my5epmj9vk9sqqqlcqqqqqqqlgqqqqqqgqjqdhnmkgahfaynuhe9md8k49xhxuatnv6jckfmsjq8maxta2l0trh5sdrqlyjlwutdnpd5gwmdnyytsl9q0dj6g08jacvthtpeg383k0sq542rz2 tb1n |
|
|
|
import sys |
|
|
|
print(lndecode(sys.argv[1], expected_hrp=sys.argv[2])) |
|
|
|