|
|
@ -941,7 +941,7 @@ class PSBTOutputType(IntEnum): |
|
|
|
|
|
|
|
|
|
|
|
# Serialization/deserialization tools |
|
|
|
def deser_compact_size(f): |
|
|
|
def deser_compact_size(f) -> Optional[int]: |
|
|
|
try: |
|
|
|
nit = f.read(1)[0] |
|
|
|
except IndexError: |
|
|
@ -977,22 +977,37 @@ class PSBTSection: |
|
|
|
raise UnexpectedEndOfStream() |
|
|
|
|
|
|
|
full_key = fd.read(key_size) |
|
|
|
key_type, key = cls.get_keytype_and_key_from_fullkey(full_key) |
|
|
|
|
|
|
|
val_size = deser_compact_size(fd) |
|
|
|
if val_size is None: raise UnexpectedEndOfStream() |
|
|
|
val = fd.read(val_size) |
|
|
|
|
|
|
|
key_type = full_key[0] |
|
|
|
key = full_key[1:] |
|
|
|
return key_type, key, val |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def create_psbt_writer(cls, fd): |
|
|
|
def wr(ktype: int, val: bytes, key: bytes = b''): |
|
|
|
fd.write(bytes.fromhex(var_int(1 + len(key)))) |
|
|
|
fd.write(bytes([ktype]) + key) |
|
|
|
fd.write(bytes.fromhex(var_int(len(val)))) |
|
|
|
fd.write(val) |
|
|
|
def wr(key_type: int, val: bytes, key: bytes = b''): |
|
|
|
full_key = cls.get_fullkey_from_keytype_and_key(key_type, key) |
|
|
|
fd.write(bytes.fromhex(var_int(len(full_key)))) # key_size |
|
|
|
fd.write(full_key) # key |
|
|
|
fd.write(bytes.fromhex(var_int(len(val)))) # val_size |
|
|
|
fd.write(val) # val |
|
|
|
return wr |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]: |
|
|
|
with io.BytesIO(full_key) as key_stream: |
|
|
|
key_type = deser_compact_size(key_stream) |
|
|
|
if key_type is None: raise UnexpectedEndOfStream() |
|
|
|
key = key_stream.read() |
|
|
|
return key_type, key |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes: |
|
|
|
key_type_bytes = bytes.fromhex(var_int(key_type)) |
|
|
|
return key_type_bytes + key |
|
|
|
|
|
|
|
def _serialize_psbt_section(self, fd): |
|
|
|
wr = self.create_psbt_writer(fd) |
|
|
|
self.serialize_psbt_section_kvs(wr) |
|
|
@ -1141,7 +1156,7 @@ class PartialTxInput(TxInput, PSBTSection): |
|
|
|
if key: raise SerializationError(f"key for {repr(kt)} must be empty") |
|
|
|
else: |
|
|
|
assert kt not in list(PSBTInputType) |
|
|
|
full_key = bytes([kt]) + key |
|
|
|
full_key = self.get_fullkey_from_keytype_and_key(kt, key) |
|
|
|
if full_key in self._unknown: |
|
|
|
raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}') |
|
|
|
self._unknown[full_key] = val |
|
|
@ -1167,7 +1182,8 @@ class PartialTxInput(TxInput, PSBTSection): |
|
|
|
if self.witness is not None: |
|
|
|
wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness) |
|
|
|
for full_key, val in sorted(self._unknown.items()): |
|
|
|
wr(full_key[0], val, key=full_key[1:]) |
|
|
|
key_type, key = self.get_keytype_and_key_from_fullkey(full_key) |
|
|
|
wr(key_type, val, key=key) |
|
|
|
|
|
|
|
def value_sats(self) -> Optional[int]: |
|
|
|
if self._trusted_value_sats is not None: |
|
|
@ -1364,7 +1380,7 @@ class PartialTxOutput(TxOutput, PSBTSection): |
|
|
|
self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val) |
|
|
|
else: |
|
|
|
assert kt not in list(PSBTOutputType) |
|
|
|
full_key = bytes([kt]) + key |
|
|
|
full_key = self.get_fullkey_from_keytype_and_key(kt, key) |
|
|
|
if full_key in self._unknown: |
|
|
|
raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}') |
|
|
|
self._unknown[full_key] = val |
|
|
@ -1378,7 +1394,8 @@ class PartialTxOutput(TxOutput, PSBTSection): |
|
|
|
packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k]) |
|
|
|
wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k) |
|
|
|
for full_key, val in sorted(self._unknown.items()): |
|
|
|
wr(full_key[0], val, key=full_key[1:]) |
|
|
|
key_type, key = self.get_keytype_and_key_from_fullkey(full_key) |
|
|
|
wr(key_type, val, key=key) |
|
|
|
|
|
|
|
def combine_with_other_txout(self, other_txout: 'TxOutput') -> None: |
|
|
|
assert self.scriptpubkey == other_txout.scriptpubkey |
|
|
@ -1486,7 +1503,7 @@ class PartialTransaction(Transaction): |
|
|
|
tx.xpubs[bip32node] = xfp, path |
|
|
|
else: |
|
|
|
assert kt not in list(PSBTGlobalType) |
|
|
|
full_key = bytes([kt]) + key |
|
|
|
full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key) |
|
|
|
if full_key in tx._unknown: |
|
|
|
raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}') |
|
|
|
tx._unknown[full_key] = val |
|
|
@ -1532,7 +1549,8 @@ class PartialTransaction(Transaction): |
|
|
|
val = pack_bip32_root_fingerprint_and_int_path(xfp, path) |
|
|
|
wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes()) |
|
|
|
for full_key, val in sorted(self._unknown.items()): |
|
|
|
wr(full_key[0], val, key=full_key[1:]) |
|
|
|
key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key) |
|
|
|
wr(key_type, val, key=key) |
|
|
|
fd.write(b'\x00') # section-separator |
|
|
|
# input sections |
|
|
|
for inp in self._inputs: |
|
|
|