# See the file "LICENSE" for information about the copyright # and warranty status of this software. import array import itertools import os import struct import time from binascii import hexlify, unhexlify from bisect import bisect_right from collections import defaultdict, namedtuple from functools import partial import logging import plyvel from lib.coins import Bitcoin from lib.script import ScriptPubKey # History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries HIST_ENTRIES_PER_KEY = 1024 HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 ADDR_TX_HASH_LEN = 4 UTXO_TX_HASH_LEN = 4 UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") def to_4_bytes(value): return struct.pack('H', self.flush_count) for hash168, hist in self.history.items(): key = b'H' + hash168 + flush_id batch.put(key, array.array('I', hist).tobytes()) self.logger.info('flushed {:,d} history entries ({:,d} MB)...' .format(self.history_size, self.history_size * 4 // 1048576)) self.history = defaultdict(list) self.history_size = 0 def open_file(self, filename, truncate=False, create=False): try: return open(filename, 'wb+' if truncate else 'rb+') except FileNotFoundError: if create: return open(filename, 'wb+') raise def read_headers(self, height, count): header_len = self.coin.HEADER_LEN self.headers_file.seek(height * header_len) return self.headers_file.read(count * header_len) def write_headers(self): headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN assert len(headers) % header_len == 0 self.headers_file.seek(self.db_height * header_len) self.headers_file.write(headers) self.headers_file.flush() self.headers = [] def write_tx_counts(self): self.txcount_file.seek(self.db_height * self.tx_counts.itemsize) self.txcount_file.write(self.tx_counts[self.db_height: self.height + 1]) self.txcount_file.flush() def write_tx_hashes(self): hash_blob = b''.join(itertools.chain(*self.tx_hashes)) assert len(hash_blob) % 32 == 0 assert self.tx_hash_file_size % 32 == 0 hashes = memoryview(hash_blob) cursor = 0 file_pos = self.db_tx_count * 32 while cursor < len(hashes): file_num, offset = divmod(file_pos, self.tx_hash_file_size) size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) filename = 'hashes{:05d}'.format(file_num) with self.open_file(filename, create=True) as f: f.seek(offset) f.write(hashes[cursor:cursor + size]) cursor += size file_pos += size self.tx_hashes = [] def cache_MB(self): '''Returns the approximate size of the cache, in MB.''' utxo_MB = ((len(self.utxo_cache.cache) + len(self.utxo_cache.db_cache)) * 100 // 1048576) hist_MB = (len(self.history) * 48 + self.history_size * 20) // 1048576 if self.height % 200 == 0: self.logger.info('cache size at height {:,d}: ' 'UTXOs: {:,d} MB history: {:,d} MB' .format(self.height, utxo_MB, hist_MB)) self.logger.info('cache entries: UTXOs: {:,d}/{:,d} ' 'history: {:,d}/{:,d}' .format(len(self.utxo_cache.cache), len(self.utxo_cache.db_cache), len(self.history), self.history_size)) return utxo_MB + hist_MB def process_block(self, block): self.headers.append(block[:self.coin.HEADER_LEN]) tx_hashes, txs = self.coin.read_block(block) self.height += 1 assert len(self.tx_counts) == self.height # These both need to be updated before calling process_tx(). # It uses them for tx hash lookup self.tx_hashes.append(tx_hashes) self.tx_counts.append(self.tx_count + len(txs)) for tx_hash, tx in zip(tx_hashes, txs): self.process_tx(tx_hash, tx) # Flush if we're getting full if self.cache_MB() > self.flush_MB: self.flush() def process_tx(self, tx_hash, tx): cache = self.utxo_cache tx_num = self.tx_count # Add the outputs as new UTXOs; spend the inputs hash168s = cache.add_many(tx_hash, tx_num, tx.outputs) if not tx.is_coinbase: for txin in tx.inputs: hash168s.add(cache.spend(txin.prevout)) for hash168 in hash168s: self.history[hash168].append(tx_num) self.history_size += len(hash168s) self.tx_count += 1 def get_tx_hash(self, tx_num): '''Returns the tx_hash and height of a tx number.''' height = bisect_right(self.tx_counts, tx_num) # Is this on disk or unflushed? if height >= self.db_height: tx_hashes = self.tx_hashes[height - self.db_height] tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] else: file_pos = tx_num * 32 file_num, offset = divmod(file_pos, self.tx_hash_file_size) filename = 'hashes{:05d}'.format(file_num) with self.open_file(filename) as f: f.seek(offset) tx_hash = f.read(32) return tx_hash, height @staticmethod def resolve_limit(limit): if limit is None: return -1 assert isinstance(limit, int) and limit >= 0 return limit def get_history(self, hash168, limit=1000): '''Generator that returns an unpruned, sorted list of (tx_hash, height) tuples of transactions that touched the address, earliest in the blockchain first. Includes both spending and receiving transactions. By default yields at most 1000 entries. Set limit to None to get them all. ''' limit = self.resolve_limit(limit) prefix = b'H' + hash168 for key, hist in self.db.iterator(prefix=prefix): a = array.array('I') a.frombytes(hist) for tx_num in a: if limit == 0: return yield self.get_tx_hash(tx_num) limit -= 1 def get_balance(self, hash168): '''Returns the confirmed balance of an address.''' return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) def get_utxos(self, hash168, limit=1000): '''Generator that yields all UTXOs for an address sorted in no particular order. By default yields at most 1000 entries. Set limit to None to get them all. ''' limit = self.resolve_limit(limit) unpack = struct.unpack prefix = b'u' + hash168 utxos = [] for k, v in self.db.iterator(prefix=prefix): (tx_pos, ) = unpack('