From 83740c1a7848a5b8240a8e26e9ab3d6f1d8ff7d0 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Thu, 7 Nov 2019 03:27:38 +0100 Subject: [PATCH] psbt: implement CompactSize key types (previously single-byte types) based on latest BIP-0174 update: bitcoin/bips#849 --- electrum/bitcoin.py | 1 + electrum/transaction.py | 46 ++++++++++++++++++++++++++++------------- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/electrum/bitcoin.py b/electrum/bitcoin.py index 576961443..8a1f154c3 100644 --- a/electrum/bitcoin.py +++ b/electrum/bitcoin.py @@ -240,6 +240,7 @@ def var_int(i: int) -> str: # https://en.bitcoin.it/wiki/Protocol_specification#Variable_length_integer # https://github.com/bitcoin/bitcoin/blob/efe1ee0d8d7f82150789f1f6840f139289628a2b/src/serialize.h#L247 # "CompactSize" + assert i >= 0, i if i<0xfd: return int_to_hex(i) elif i<=0xffff: diff --git a/electrum/transaction.py b/electrum/transaction.py index 684dbb4f7..0106cbdfc 100644 --- a/electrum/transaction.py +++ b/electrum/transaction.py @@ -941,7 +941,7 @@ class PSBTOutputType(IntEnum): # Serialization/deserialization tools -def deser_compact_size(f): +def deser_compact_size(f) -> Optional[int]: try: nit = f.read(1)[0] except IndexError: @@ -977,22 +977,37 @@ class PSBTSection: raise UnexpectedEndOfStream() full_key = fd.read(key_size) + key_type, key = cls.get_keytype_and_key_from_fullkey(full_key) + val_size = deser_compact_size(fd) + if val_size is None: raise UnexpectedEndOfStream() val = fd.read(val_size) - key_type = full_key[0] - key = full_key[1:] return key_type, key, val @classmethod def create_psbt_writer(cls, fd): - def wr(ktype: int, val: bytes, key: bytes = b''): - fd.write(bytes.fromhex(var_int(1 + len(key)))) - fd.write(bytes([ktype]) + key) - fd.write(bytes.fromhex(var_int(len(val)))) - fd.write(val) + def wr(key_type: int, val: bytes, key: bytes = b''): + full_key = cls.get_fullkey_from_keytype_and_key(key_type, key) + fd.write(bytes.fromhex(var_int(len(full_key)))) # key_size + fd.write(full_key) # key + fd.write(bytes.fromhex(var_int(len(val)))) # val_size + fd.write(val) # val return wr + @classmethod + def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]: + with io.BytesIO(full_key) as key_stream: + key_type = deser_compact_size(key_stream) + if key_type is None: raise UnexpectedEndOfStream() + key = key_stream.read() + return key_type, key + + @classmethod + def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes: + key_type_bytes = bytes.fromhex(var_int(key_type)) + return key_type_bytes + key + def _serialize_psbt_section(self, fd): wr = self.create_psbt_writer(fd) self.serialize_psbt_section_kvs(wr) @@ -1141,7 +1156,7 @@ class PartialTxInput(TxInput, PSBTSection): if key: raise SerializationError(f"key for {repr(kt)} must be empty") else: assert kt not in list(PSBTInputType) - full_key = bytes([kt]) + key + full_key = self.get_fullkey_from_keytype_and_key(kt, key) if full_key in self._unknown: raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}') self._unknown[full_key] = val @@ -1167,7 +1182,8 @@ class PartialTxInput(TxInput, PSBTSection): if self.witness is not None: wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness) for full_key, val in sorted(self._unknown.items()): - wr(full_key[0], val, key=full_key[1:]) + key_type, key = self.get_keytype_and_key_from_fullkey(full_key) + wr(key_type, val, key=key) def value_sats(self) -> Optional[int]: if self._trusted_value_sats is not None: @@ -1364,7 +1380,7 @@ class PartialTxOutput(TxOutput, PSBTSection): self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val) else: assert kt not in list(PSBTOutputType) - full_key = bytes([kt]) + key + full_key = self.get_fullkey_from_keytype_and_key(kt, key) if full_key in self._unknown: raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}') self._unknown[full_key] = val @@ -1378,7 +1394,8 @@ class PartialTxOutput(TxOutput, PSBTSection): packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k]) wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k) for full_key, val in sorted(self._unknown.items()): - wr(full_key[0], val, key=full_key[1:]) + key_type, key = self.get_keytype_and_key_from_fullkey(full_key) + wr(key_type, val, key=key) def combine_with_other_txout(self, other_txout: 'TxOutput') -> None: assert self.scriptpubkey == other_txout.scriptpubkey @@ -1486,7 +1503,7 @@ class PartialTransaction(Transaction): tx.xpubs[bip32node] = xfp, path else: assert kt not in list(PSBTGlobalType) - full_key = bytes([kt]) + key + full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key) if full_key in tx._unknown: raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}') tx._unknown[full_key] = val @@ -1532,7 +1549,8 @@ class PartialTransaction(Transaction): val = pack_bip32_root_fingerprint_and_int_path(xfp, path) wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes()) for full_key, val in sorted(self._unknown.items()): - wr(full_key[0], val, key=full_key[1:]) + key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key) + wr(key_type, val, key=key) fd.write(b'\x00') # section-separator # input sections for inp in self._inputs: