|
|
@ -27,7 +27,8 @@ |
|
|
|
|
|
|
|
# Note: The deserialization code originally comes from ABE. |
|
|
|
|
|
|
|
from typing import Sequence, Union, NamedTuple, Tuple, Optional, Iterable |
|
|
|
from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable, |
|
|
|
Callable) |
|
|
|
|
|
|
|
from .util import print_error, profiler |
|
|
|
|
|
|
@ -288,15 +289,39 @@ def script_GetOpName(opcode): |
|
|
|
return (opcodes.whatis(opcode)).replace("OP_", "") |
|
|
|
|
|
|
|
|
|
|
|
class OPPushDataGeneric: |
|
|
|
def __init__(self, pushlen: Callable=None): |
|
|
|
if pushlen is not None: |
|
|
|
self.check_data_len = pushlen |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def check_data_len(cls, datalen: int) -> bool: |
|
|
|
# Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. |
|
|
|
return opcodes.OP_PUSHDATA4 >= datalen >= 0 |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def is_instance(cls, item): |
|
|
|
# accept objects that are instances of this class |
|
|
|
# or other classes that are subclasses |
|
|
|
return isinstance(item, cls) \ |
|
|
|
or (isinstance(item, type) and issubclass(item, cls)) |
|
|
|
|
|
|
|
|
|
|
|
OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65)) |
|
|
|
# note that this does not include x_pubkeys ! |
|
|
|
|
|
|
|
|
|
|
|
def match_decoded(decoded, to_match): |
|
|
|
if decoded is None: |
|
|
|
return False |
|
|
|
if len(decoded) != len(to_match): |
|
|
|
return False |
|
|
|
for i in range(len(decoded)): |
|
|
|
if to_match[i] == opcodes.OP_PUSHDATA4 and decoded[i][0] <= opcodes.OP_PUSHDATA4 and decoded[i][0]>0: |
|
|
|
continue # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent. |
|
|
|
if to_match[i] != decoded[i][0]: |
|
|
|
to_match_item = to_match[i] |
|
|
|
decoded_item = decoded[i] |
|
|
|
if OPPushDataGeneric.is_instance(to_match_item) and to_match_item.check_data_len(decoded_item[0]): |
|
|
|
continue |
|
|
|
if to_match_item != decoded_item[0]: |
|
|
|
return False |
|
|
|
return True |
|
|
|
|
|
|
@ -319,7 +344,7 @@ def parse_scriptSig(d, _bytes): |
|
|
|
bh2u(_bytes)) |
|
|
|
return |
|
|
|
|
|
|
|
match = [ opcodes.OP_PUSHDATA4 ] |
|
|
|
match = [OPPushDataGeneric] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
item = decoded[0][1] |
|
|
|
if item[0] == 0: |
|
|
@ -350,7 +375,7 @@ def parse_scriptSig(d, _bytes): |
|
|
|
# p2pkh TxIn transactions push a signature |
|
|
|
# (71-73 bytes) and then their public key |
|
|
|
# (33 or 65 bytes) onto the stack: |
|
|
|
match = [ opcodes.OP_PUSHDATA4, opcodes.OP_PUSHDATA4 ] |
|
|
|
match = [OPPushDataGeneric, OPPushDataGeneric] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
sig = bh2u(decoded[0][1]) |
|
|
|
x_pubkey = bh2u(decoded[1][1]) |
|
|
@ -370,7 +395,7 @@ def parse_scriptSig(d, _bytes): |
|
|
|
return |
|
|
|
|
|
|
|
# p2sh transaction, m of n |
|
|
|
match = [ opcodes.OP_0 ] + [ opcodes.OP_PUSHDATA4 ] * (len(decoded) - 1) |
|
|
|
match = [opcodes.OP_0] + [OPPushDataGeneric] * (len(decoded) - 1) |
|
|
|
if match_decoded(decoded, match): |
|
|
|
x_sig = [bh2u(x[1]) for x in decoded[1:-1]] |
|
|
|
redeem_script_unsanitized = decoded[-1][1] # for partial multisig txn, this has x_pubkeys |
|
|
@ -393,7 +418,7 @@ def parse_scriptSig(d, _bytes): |
|
|
|
return |
|
|
|
|
|
|
|
# custom partial format for imported addresses |
|
|
|
match = [ opcodes.OP_INVALIDOPCODE, opcodes.OP_0, opcodes.OP_PUSHDATA4 ] |
|
|
|
match = [opcodes.OP_INVALIDOPCODE, opcodes.OP_0, OPPushDataGeneric] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
x_pubkey = bh2u(decoded[2][1]) |
|
|
|
pubkey, address = xpubkey_to_address(x_pubkey) |
|
|
@ -421,7 +446,7 @@ def parse_redeemScript_multisig(redeem_script: bytes): |
|
|
|
raise NotRecognizedRedeemScript() |
|
|
|
op_m = opcodes.OP_1 + m - 1 |
|
|
|
op_n = opcodes.OP_1 + n - 1 |
|
|
|
match_multisig = [ op_m ] + [opcodes.OP_PUSHDATA4]*n + [ op_n, opcodes.OP_CHECKMULTISIG ] |
|
|
|
match_multisig = [op_m] + [OPPushDataGeneric] * n + [op_n, opcodes.OP_CHECKMULTISIG] |
|
|
|
if not match_decoded(dec2, match_multisig): |
|
|
|
raise NotRecognizedRedeemScript() |
|
|
|
x_pubkeys = [bh2u(x[1]) for x in dec2[1:-2]] |
|
|
@ -433,33 +458,36 @@ def parse_redeemScript_multisig(redeem_script: bytes): |
|
|
|
return m, n, x_pubkeys, pubkeys, redeem_script_sanitized |
|
|
|
|
|
|
|
|
|
|
|
def get_address_from_output_script(_bytes, *, net=None): |
|
|
|
def get_address_from_output_script(_bytes: bytes, *, net=None) -> Tuple[int, str]: |
|
|
|
try: |
|
|
|
decoded = [x for x in script_GetOp(_bytes)] |
|
|
|
except MalformedBitcoinScript: |
|
|
|
decoded = None |
|
|
|
|
|
|
|
# The Genesis Block, self-payments, and pay-by-IP-address payments look like: |
|
|
|
# 65 BYTES:... CHECKSIG |
|
|
|
match = [ opcodes.OP_PUSHDATA4, opcodes.OP_CHECKSIG ] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
# p2pk |
|
|
|
match = [OPPushDataPubkey, opcodes.OP_CHECKSIG] |
|
|
|
if match_decoded(decoded, match) and ecc.ECPubkey.is_pubkey_bytes(decoded[0][1]): |
|
|
|
return TYPE_PUBKEY, bh2u(decoded[0][1]) |
|
|
|
|
|
|
|
# Pay-by-Bitcoin-address TxOuts look like: |
|
|
|
# DUP HASH160 20 BYTES:... EQUALVERIFY CHECKSIG |
|
|
|
match = [ opcodes.OP_DUP, opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG ] |
|
|
|
# p2pkh |
|
|
|
match = [opcodes.OP_DUP, opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
return TYPE_ADDRESS, hash160_to_p2pkh(decoded[2][1], net=net) |
|
|
|
|
|
|
|
# p2sh |
|
|
|
match = [ opcodes.OP_HASH160, opcodes.OP_PUSHDATA4, opcodes.OP_EQUAL ] |
|
|
|
match = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
return TYPE_ADDRESS, hash160_to_p2sh(decoded[1][1], net=net) |
|
|
|
|
|
|
|
# segwit address |
|
|
|
possible_witness_versions = [opcodes.OP_0] + list(range(opcodes.OP_1, opcodes.OP_16 + 1)) |
|
|
|
for witver, opcode in enumerate(possible_witness_versions): |
|
|
|
match = [ opcode, opcodes.OP_PUSHDATA4 ] |
|
|
|
# segwit address (version 0) |
|
|
|
match = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
return TYPE_ADDRESS, hash_to_segwit_addr(decoded[1][1], witver=0, net=net) |
|
|
|
|
|
|
|
# segwit address (version 1-16) |
|
|
|
future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1)) |
|
|
|
for witver, opcode in enumerate(future_witness_versions, start=1): |
|
|
|
match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)] |
|
|
|
if match_decoded(decoded, match): |
|
|
|
return TYPE_ADDRESS, hash_to_segwit_addr(decoded[1][1], witver=witver, net=net) |
|
|
|
|
|
|
|