From 1da29305fccdba6b96d1fa599c2afa56f66649ad Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Mon, 19 Oct 2020 22:33:12 +0200 Subject: [PATCH] pyln: Improve zbase32 encoding / decoding - Adds bitarray filling so mesages of any length can be encoded, instead of forcing the encoding to be of messages with length multiple of 5. - Adds checks for encoding / decoding and raises expections if the inputs are not as expected. - Flags functions that are supposed to be internal as "private". --- contrib/pyln-proto/pyln/proto/zbase32.py | 62 ++++++++++++++++++------ 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/contrib/pyln-proto/pyln/proto/zbase32.py b/contrib/pyln-proto/pyln/proto/zbase32.py index 460396fd9..b02029636 100644 --- a/contrib/pyln-proto/pyln/proto/zbase32.py +++ b/contrib/pyln-proto/pyln/proto/zbase32.py @@ -23,34 +23,68 @@ zbase32_revchars = [ ] -def bitarray_to_u5(barr): - assert len(barr) % 5 == 0 +def _message_to_bitarray(message): + barr = bitstring.ConstBitStream(message) + padding_len = 5 - (len(barr) % 5) + if padding_len < 5: + # The bitarray length has to be multiple of 5. If not, it is right-padded with zeros. + barr = bitstring.ConstBitStream(bin="{}{}".format(barr.bin, '0' * padding_len)) + return barr + + +def _bitarray_to_message(barr): + padding_len = len(barr) % 8 + if padding_len > 0: + return bitstring.Bits(bin=barr.bin[:-padding_len]).bytes + else: + return barr.bytes + + +def _bitarray_to_u5(barr): ret = [] - s = bitstring.ConstBitStream(barr) - while s.pos != s.len: - ret.append(s.read(5).uint) + while barr.pos != barr.len: + ret.append(barr.read(5).uint) return ret -def u5_to_bitarray(arr): +def _u5_to_bitarray(arr): ret = bitstring.BitArray() for a in arr: ret += bitstring.pack("uint:5", a) return ret -def encode(b): - uint5s = bitarray_to_u5(b) +def is_zbase32_encoded(message): + if isinstance(message, str): + message = message.encode("ASCII") + elif not isinstance(message, bytes): + raise TypeError("message must be string or bytes") + return set(message).issubset(zbase32_chars) + + +def encode(message): + if isinstance(message, str): + message = message.encode('ASCII') + elif not isinstance(message, bytes): + raise TypeError("message must be string or bytes") + + barr = _message_to_bitarray(message) + uint5s = _bitarray_to_u5(barr) res = [zbase32_chars[c] for c in uint5s] return bytes(res) -def decode(b): - if isinstance(b, str): - b = b.encode('ASCII') +def decode(message): + if isinstance(message, str): + message = message.encode('ASCII') + elif not isinstance(message, bytes): + raise TypeError("message must be string or bytes") + + if not is_zbase32_encoded(message): + raise ValueError("message is not zbase32 encoded") uint5s = [] - for c in b: + for c in message: uint5s.append(zbase32_revchars[c]) - dec = u5_to_bitarray(uint5s) - return dec.bytes + dec = _u5_to_bitarray(uint5s) + return _bitarray_to_message(dec)