|
|
@ -48,7 +48,7 @@ from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160, |
|
|
|
var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN, |
|
|
|
int_to_hex, push_script, b58_address_to_hash160, |
|
|
|
opcodes, add_number_to_script, base_decode, is_segwit_script_type, |
|
|
|
base_encode) |
|
|
|
base_encode, construct_witness, construct_script) |
|
|
|
from .crypto import sha256d |
|
|
|
from .logging import get_logger |
|
|
|
|
|
|
@ -243,6 +243,11 @@ class TxInput: |
|
|
|
n = vds.read_compact_size() |
|
|
|
return list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) |
|
|
|
|
|
|
|
def is_segwit(self, *, guess_for_address=False) -> bool: |
|
|
|
if self.witness not in (b'\x00', b'', None): |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
class BCDataStream(object): |
|
|
|
"""Workalike python implementation of Bitcoin's CDataStream class.""" |
|
|
@ -479,18 +484,6 @@ def parse_input(vds: BCDataStream) -> TxInput: |
|
|
|
return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence) |
|
|
|
|
|
|
|
|
|
|
|
def construct_witness(items: Sequence[Union[str, int, bytes]]) -> str: |
|
|
|
"""Constructs a witness from the given stack items.""" |
|
|
|
witness = var_int(len(items)) |
|
|
|
for item in items: |
|
|
|
if type(item) is int: |
|
|
|
item = bitcoin.script_num_to_hex(item) |
|
|
|
elif isinstance(item, (bytes, bytearray)): |
|
|
|
item = bh2u(item) |
|
|
|
witness += bitcoin.witness_push(item) |
|
|
|
return witness |
|
|
|
|
|
|
|
|
|
|
|
def parse_witness(vds: BCDataStream, txin: TxInput) -> None: |
|
|
|
n = vds.read_compact_size() |
|
|
|
witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n)) |
|
|
@ -512,10 +505,7 @@ def parse_output(vds: BCDataStream) -> TxOutput: |
|
|
|
def multisig_script(public_keys: Sequence[str], m: int) -> str: |
|
|
|
n = len(public_keys) |
|
|
|
assert 1 <= m <= n <= 15, f'm {m}, n {n}' |
|
|
|
op_m = bh2u(add_number_to_script(m)) |
|
|
|
op_n = bh2u(add_number_to_script(n)) |
|
|
|
keylist = [push_script(k) for k in public_keys] |
|
|
|
return op_m + ''.join(keylist) + op_n + opcodes.OP_CHECKMULTISIG.hex() |
|
|
|
return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -650,8 +640,8 @@ class Transaction: |
|
|
|
assert isinstance(txin, PartialTxInput) |
|
|
|
|
|
|
|
_type = txin.script_type |
|
|
|
if not cls.is_segwit_input(txin): |
|
|
|
return '00' |
|
|
|
if not txin.is_segwit(): |
|
|
|
return construct_witness([]) |
|
|
|
|
|
|
|
if _type in ('address', 'unknown') and estimate_size: |
|
|
|
_type = cls.guess_txintype_from_address(txin.address) |
|
|
@ -660,28 +650,11 @@ class Transaction: |
|
|
|
return construct_witness([sig_list[0], pubkeys[0]]) |
|
|
|
elif _type in ['p2wsh', 'p2wsh-p2sh']: |
|
|
|
witness_script = multisig_script(pubkeys, txin.num_sig) |
|
|
|
return construct_witness([0] + sig_list + [witness_script]) |
|
|
|
return construct_witness([0, *sig_list, witness_script]) |
|
|
|
elif _type in ['p2pk', 'p2pkh', 'p2sh']: |
|
|
|
return '00' |
|
|
|
return construct_witness([]) |
|
|
|
raise UnknownTxinType(f'cannot construct witness for txin_type: {_type}') |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def is_segwit_input(cls, txin: 'TxInput', *, guess_for_address=False) -> bool: |
|
|
|
if txin.witness not in (b'\x00', b'', None): |
|
|
|
return True |
|
|
|
if not isinstance(txin, PartialTxInput): |
|
|
|
return False |
|
|
|
if txin.is_native_segwit() or txin.is_p2sh_segwit(): |
|
|
|
return True |
|
|
|
if txin.is_native_segwit() is False and txin.is_p2sh_segwit() is False: |
|
|
|
return False |
|
|
|
if txin.witness_script: |
|
|
|
return True |
|
|
|
_type = txin.script_type |
|
|
|
if _type == 'address' and guess_for_address: |
|
|
|
_type = cls.guess_txintype_from_address(txin.address) |
|
|
|
return is_segwit_script_type(_type) |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def guess_txintype_from_address(cls, addr: Optional[str]) -> str: |
|
|
|
# It's not possible to tell the script type in general |
|
|
@ -714,47 +687,46 @@ class Transaction: |
|
|
|
assert isinstance(txin, PartialTxInput) |
|
|
|
|
|
|
|
if txin.is_p2sh_segwit() and txin.redeem_script: |
|
|
|
return push_script(txin.redeem_script.hex()) |
|
|
|
return construct_script([txin.redeem_script]) |
|
|
|
if txin.is_native_segwit(): |
|
|
|
return '' |
|
|
|
|
|
|
|
_type = txin.script_type |
|
|
|
pubkeys, sig_list = self.get_siglist(txin, estimate_size=estimate_size) |
|
|
|
script = ''.join(push_script(x) for x in sig_list) |
|
|
|
if _type in ('address', 'unknown') and estimate_size: |
|
|
|
_type = self.guess_txintype_from_address(txin.address) |
|
|
|
if _type == 'p2pk': |
|
|
|
return script |
|
|
|
return construct_script([sig_list[0]]) |
|
|
|
elif _type == 'p2sh': |
|
|
|
# put op_0 before script |
|
|
|
script = '00' + script |
|
|
|
redeem_script = multisig_script(pubkeys, txin.num_sig) |
|
|
|
script += push_script(redeem_script) |
|
|
|
return script |
|
|
|
return construct_script([0, *sig_list, redeem_script]) |
|
|
|
elif _type == 'p2pkh': |
|
|
|
script += push_script(pubkeys[0]) |
|
|
|
return script |
|
|
|
return construct_script([sig_list[0], pubkeys[0]]) |
|
|
|
elif _type in ['p2wpkh', 'p2wsh']: |
|
|
|
return '' |
|
|
|
elif _type == 'p2wpkh-p2sh': |
|
|
|
redeem_script = bitcoin.p2wpkh_nested_script(pubkeys[0]) |
|
|
|
return push_script(redeem_script) |
|
|
|
return construct_script([redeem_script]) |
|
|
|
elif _type == 'p2wsh-p2sh': |
|
|
|
if estimate_size: |
|
|
|
witness_script = '' |
|
|
|
else: |
|
|
|
witness_script = self.get_preimage_script(txin) |
|
|
|
redeem_script = bitcoin.p2wsh_nested_script(witness_script) |
|
|
|
return push_script(redeem_script) |
|
|
|
return construct_script([redeem_script]) |
|
|
|
raise UnknownTxinType(f'cannot construct scriptSig for txin_type: {_type}') |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def get_preimage_script(cls, txin: 'PartialTxInput') -> str: |
|
|
|
if txin.witness_script: |
|
|
|
opcodes_in_witness_script = [x[0] for x in script_GetOp(txin.witness_script)] |
|
|
|
if opcodes.OP_CODESEPARATOR in opcodes_in_witness_script: |
|
|
|
if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]: |
|
|
|
raise Exception('OP_CODESEPARATOR black magic is not supported') |
|
|
|
return txin.witness_script.hex() |
|
|
|
if not txin.is_segwit() and txin.redeem_script: |
|
|
|
if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]: |
|
|
|
raise Exception('OP_CODESEPARATOR black magic is not supported') |
|
|
|
return txin.redeem_script.hex() |
|
|
|
|
|
|
|
pubkeys = [pk.hex() for pk in txin.pubkeys] |
|
|
|
if txin.script_type in ['p2sh', 'p2wsh', 'p2wsh-p2sh']: |
|
|
@ -790,7 +762,7 @@ class Transaction: |
|
|
|
hashOutputs=hashOutputs) |
|
|
|
|
|
|
|
def is_segwit(self, *, guess_for_address=False): |
|
|
|
return any(self.is_segwit_input(txin, guess_for_address=guess_for_address) |
|
|
|
return any(txin.is_segwit(guess_for_address=guess_for_address) |
|
|
|
for txin in self.inputs()) |
|
|
|
|
|
|
|
def invalidate_ser_cache(self): |
|
|
@ -848,7 +820,7 @@ class Transaction: |
|
|
|
def txid(self) -> Optional[str]: |
|
|
|
if self._cached_txid is None: |
|
|
|
self.deserialize() |
|
|
|
all_segwit = all(self.is_segwit_input(x) for x in self.inputs()) |
|
|
|
all_segwit = all(txin.is_segwit() for txin in self.inputs()) |
|
|
|
if not all_segwit and not self.is_complete(): |
|
|
|
return None |
|
|
|
try: |
|
|
@ -892,7 +864,7 @@ class Transaction: |
|
|
|
script = cls.input_script(txin, estimate_size=True) |
|
|
|
input_size = len(cls.serialize_input(txin, script)) // 2 |
|
|
|
|
|
|
|
if cls.is_segwit_input(txin, guess_for_address=True): |
|
|
|
if txin.is_segwit(guess_for_address=True): |
|
|
|
witness_size = len(cls.serialize_witness(txin, estimate_size=True)) // 2 |
|
|
|
else: |
|
|
|
witness_size = 1 if is_segwit_tx else 0 |
|
|
@ -1219,7 +1191,7 @@ class PartialTxInput(TxInput, PSBTSection): |
|
|
|
# without verifying the input amount. This means, given a maliciously modified PSBT, |
|
|
|
# for non-segwit inputs, we might end up burning coins as miner fees. |
|
|
|
if for_signing and False: |
|
|
|
if not Transaction.is_segwit_input(self) and self.witness_utxo: |
|
|
|
if not self.is_segwit() and self.witness_utxo: |
|
|
|
raise PSBTInputConsistencyFailure(f"PSBT input validation: " |
|
|
|
f"If a witness UTXO is provided, no non-witness signature may be created") |
|
|
|
if self.redeem_script and self.address: |
|
|
@ -1359,7 +1331,7 @@ class PartialTxInput(TxInput, PSBTSection): |
|
|
|
return True |
|
|
|
if self.is_coinbase_input(): |
|
|
|
return True |
|
|
|
if self.script_sig is not None and not Transaction.is_segwit_input(self): |
|
|
|
if self.script_sig is not None and not self.is_segwit(): |
|
|
|
return True |
|
|
|
signatures = list(self.part_sigs.values()) |
|
|
|
s = len(signatures) |
|
|
@ -1461,6 +1433,20 @@ class PartialTxInput(TxInput, PSBTSection): |
|
|
|
self._is_p2sh_segwit = calc_if_p2sh_segwit_now() |
|
|
|
return self._is_p2sh_segwit |
|
|
|
|
|
|
|
def is_segwit(self, *, guess_for_address=False) -> bool: |
|
|
|
if super().is_segwit(): |
|
|
|
return True |
|
|
|
if self.is_native_segwit() or self.is_p2sh_segwit(): |
|
|
|
return True |
|
|
|
if self.is_native_segwit() is False and self.is_p2sh_segwit() is False: |
|
|
|
return False |
|
|
|
if self.witness_script: |
|
|
|
return True |
|
|
|
_type = self.script_type |
|
|
|
if _type == 'address' and guess_for_address: |
|
|
|
_type = Transaction.guess_txintype_from_address(self.address) |
|
|
|
return is_segwit_script_type(_type) |
|
|
|
|
|
|
|
def already_has_some_signatures(self) -> bool: |
|
|
|
"""Returns whether progress has been made towards completing this input.""" |
|
|
|
return (self.part_sigs |
|
|
@ -1809,7 +1795,7 @@ class PartialTransaction(Transaction): |
|
|
|
raise Exception("only SIGHASH_ALL signing is supported!") |
|
|
|
nHashType = int_to_hex(sighash, 4) |
|
|
|
preimage_script = self.get_preimage_script(txin) |
|
|
|
if self.is_segwit_input(txin): |
|
|
|
if txin.is_segwit(): |
|
|
|
if bip143_shared_txdigest_fields is None: |
|
|
|
bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields() |
|
|
|
hashPrevouts = bip143_shared_txdigest_fields.hashPrevouts |
|
|
|