Browse Source

Add merkle library with tests

patch-2
Neil Booth 7 years ago
parent
commit
af45363eeb
  1. 154
      electrumx/lib/merkle.py
  2. 143
      tests/lib/test_merkle.py

154
electrumx/lib/merkle.py

@ -0,0 +1,154 @@
# Copyright (c) 2018, Neil Booth
#
# All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# and warranty status of this software.
'''Merkle trees, branches, proofs and roots.'''
from math import ceil, log
from electrumx.lib.hash import double_sha256
class Merkle(object):
'''Perform merkle tree calculations on binary hashes using a given hash
function.
If the hash count is not even, the final hash is repeated when
calculating the next merkle layer up the tree.
'''
def __init__(self, hash_func=double_sha256):
self.hash_func = hash_func
def tree_depth(self, hash_count):
return self.branch_length(hash_count) + 1
def branch_length(self, hash_count):
'''Return the length of a merkle branch given the number of hashes.'''
if not isinstance(hash_count, int):
raise TypeError('hash_count must be an integer')
if hash_count < 1:
raise ValueError('hash_count must be at least 1')
return ceil(log(hash_count, 2))
def branch(self, hashes, index, length=None):
'''Return a (merkle branch, merkle_root) pair given hashes, and the
index of one of those hashes.
'''
hashes = list(hashes)
if not isinstance(index, int):
raise TypeError('index must be an integer')
# This also asserts hashes is not empty
if not 0 <= index < len(hashes):
raise ValueError('index out of range')
natural_length = self.branch_length(len(hashes))
if length is None:
length = natural_length
else:
if not isinstance(length, int):
raise TypeError('length must be an integer')
if length < natural_length:
raise ValueError('length out of range')
hash_func = self.hash_func
branch = []
for _ in range(length):
if len(hashes) & 1:
hashes.append(hashes[-1])
branch.append(hashes[index ^ 1])
index >>= 1
hashes = [hash_func(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return branch, hashes[0]
def root(self, hashes, length=None):
'''Return the merkle root of a non-empty iterable of binary hashes.'''
branch, root = self.branch(hashes, 0, length)
return root
def root_from_proof(self, hash, branch, index):
'''Return the merkle root given a hash, a merkle branch to it, and
its index in the hashes array.
branch is an iterable sorted deepest to shallowest. If the
returned root is the expected value then the merkle proof is
verified.
The caller should have confirmed the length of the branch with
branch_length(). Unfortunately this is not easily done for
bitcoin transactions as the number of transactions in a block
is unknown to an SPV client.
'''
hash_func = self.hash_func
for elt in branch:
if index & 1:
hash = hash_func(elt + hash)
else:
hash = hash_func(hash + elt)
index >>= 1
if index:
raise ValueError('index out of range for branch')
return hash
def level(self, hashes, depth_higher):
'''Return a level of the merkle tree of hashes the given depth
higher than the bottom row of the original tree.'''
size = 1 << depth_higher
root = self.root
return [root(hashes[n: n + size], depth_higher)
for n in range(0, len(hashes), size)]
def branch_from_level(self, level, leaf_hashes, index, depth_higher):
'''Return a (merkle branch, merkle_root) pair when a merkle-tree has a
level cached.
To maximally reduce the amount of data hashed in computing a
markle branch, cache a tree of depth N at level N // 2.
level is a list of hashes in the middle of the tree (returned
by level())
leaf_hashes are the leaves needed to calculate a partial branch
up to level.
depth_higher is how much higher level is than the leaves of the tree
index is the index in the full list of hashes of the hash whose
merkle branch we want.
'''
if not isinstance(level, list):
raise TypeError("level must be a list")
if not isinstance(leaf_hashes, list):
raise TypeError("level must be a list")
leaf_index = (index >> depth_higher) << depth_higher
leaf_branch, leaf_root = self.branch(leaf_hashes, index - leaf_index,
depth_higher)
index >>= depth_higher
level_branch, root = self.branch(level, index)
# Check last so that we know index is in-range
if leaf_root != level[index]:
raise ValueError('leaf hashes inconsistent with level')
return leaf_branch + level_branch, root

143
tests/lib/test_merkle.py

@ -0,0 +1,143 @@
import pytest
from electrumx.lib.merkle import Merkle
Merkle = Merkle()
hashes = [Merkle.hash_func(bytes([x])) for x in range(8)]
roots = [
b'\x14\x06\xe0X\x81\xe2\x996wf\xd3\x13\xe2l\x05VN\xc9\x1b\xf7!\xd3\x17&\xbdnF\xe6\x06\x89S\x9a',
b'K\xbe\x83\xbc8\xeb\xe2\xbc\xc7R\r#A9\xdf\x1c\x0e\xb9\xff\xa5\x1f\x83\xea\xb1\xc5\x12\x9b[\x90kvU',
b'\xe1)\xdf\xe0/V\x7f\xc6\x12\xd1&YmC@aD\xf4\nw\x18\x10\xacqCB\x1d-\xf3\xe5\xc1\xd0',
b'\xe3/W\x01\xa0\x11Z+M\xc7/Rj\xf1aLY,\x19\xee\x95\xcf\xcb\x055\x96\x1e\x07g\xba\xf7\x8e',
b'\xf4\x118I\xd6(\xf7\xc3\xbc\x91\xcc\x0f\xf7\x85\xa6\xae\xe3\xee#l\x1c\x91+(\xcc\t\xc4O\x9f\x97\xb7H',
b'\xfb[\xb7\xe4\x82Y"\xea\xe8\xc2\xba\xec\x96\x0c\x8fR3\x84R"\x13Jj=\x84\x0e<\x12\x01\xafu\xed',
b'}\xe6\\}W\xcd\xc7)q\xc9\xbe\xab\x94\xafj\xd4\xe9\x9f#?\xb6\xcc\xeb\xd2\xb4\xb1\x9f\x13i|\xa5M',
b'o\x97(*\xb3G\xa2e\xae3\x83\xe1V\x9eb\xda\x8c\x19\xa6\x8c\xfag\r+az\x7f\xedGD\xbb\xfe'
]
def test_branch_length():
assert Merkle.branch_length(1) == 0
assert Merkle.branch_length(2) == 1
for n in range(3, 5):
assert Merkle.branch_length(n) == 2
for n in range(5, 9):
assert Merkle.branch_length(n) == 3
def test_branch_length_bad():
with pytest.raises(TypeError):
Merkle.branch_length(1.0)
for n in (-1, 0):
with pytest.raises(ValueError):
Merkle.branch_length(n)
def test_tree_depth():
for n in range(1, 10):
assert Merkle.tree_depth(n) == Merkle.branch_length(n) + 1
def test_root():
for n in range(len(hashes)):
assert Merkle.root(hashes[:n + 1]) == roots[n]
def test_root_bad():
with pytest.raises(TypeError):
Merkle.root(0)
with pytest.raises(ValueError):
Merkle.root([])
def test_branch_and_root_from_proof():
for n in range(len(hashes)):
for m in range(n + 1):
branch, root = Merkle.branch(hashes[:n + 1], m)
assert root == roots[n]
root = Merkle.root_from_proof(hashes[m], branch, m)
assert root == roots[n]
def test_branch_bad():
with pytest.raises(TypeError):
Merkle.branch(0, 0)
with pytest.raises(ValueError):
Merkle.branch([], 0)
with pytest.raises(TypeError):
Merkle.branch(hashes, 0.0)
with pytest.raises(ValueError):
Merkle.branch(hashes[:2], -1)
with pytest.raises(ValueError):
Merkle.branch(hashes[:2], 2)
Merkle.branch(hashes, 0, 3)
with pytest.raises(TypeError):
Merkle.branch(hashes, 0, 3.0)
with pytest.raises(ValueError):
Merkle.branch(hashes, 0, 2)
def test_root_from_proof_bad():
with pytest.raises(TypeError):
Merkle.root_from_proof(0, hashes[:2], 0)
with pytest.raises(TypeError):
Merkle.root_from_proof(hashes[0], hashes[0], 0)
with pytest.raises(ValueError):
Merkle.root_from_proof(hashes[0], hashes[:3], -1)
with pytest.raises(ValueError):
Merkle.root_from_proof(hashes[0], hashes[:3], 8)
def test_level():
for n in range(len(hashes)):
depth = Merkle.tree_depth(n + 1)
for depth_higher in range(0, depth):
level = Merkle.level(hashes[:n + 1], depth_higher)
if depth_higher == 0:
assert level == hashes[:n + 1]
if depth_higher == depth:
assert level == [roots[n]]
# Check raising from level to root works
assert Merkle.root(level) == roots[n]
def test_branch_from_level():
# For all sub-trees
for n in range(0, len(hashes)):
part = hashes[:n + 1]
# For all depths in sub-tree
for depth_higher in range(0, Merkle.tree_depth(len(part))):
level = Merkle.level(part, depth_higher)
# For each hash in sub-tree
for index, hash in enumerate(part):
leaf_index = (index >> depth_higher) << depth_higher
leaf_hashes = part[leaf_index:
leaf_index + (1 << depth_higher)]
branch = Merkle.branch(part, index)
branch2 = Merkle.branch_from_level(level, leaf_hashes,
index, depth_higher)
assert branch == branch2
def test_branch_from_level_bad():
with pytest.raises(TypeError):
Merkle.branch_from_level(hashes[0], hashes, 0, 0)
with pytest.raises(TypeError):
Merkle.branch_from_level(hashes, hashes[0], 0, 0)
Merkle.branch_from_level(hashes, [hashes[0]], 0, 0)
with pytest.raises(ValueError):
Merkle.branch_from_level(hashes, [hashes[0]], -1, 0)
with pytest.raises(TypeError):
Merkle.branch_from_level(hashes, hashes, 0.0, 0)
with pytest.raises(ValueError):
Merkle.branch_from_level(hashes, [hashes[0]], 0, -1)
with pytest.raises(ValueError):
Merkle.branch_from_level(hashes, [hashes[0]], 0, 1)
with pytest.raises(ValueError):
# Inconsistent hash
Merkle.branch_from_level(hashes, [hashes[1]], 0, 0)
with pytest.raises(ValueError):
# Inconsistent hash
Merkle.branch_from_level(hashes, [hashes[0]], 1, 0)
Loading…
Cancel
Save