|
|
@ -2,6 +2,7 @@ |
|
|
|
# and warranty status of this software. |
|
|
|
|
|
|
|
import array |
|
|
|
import ast |
|
|
|
import itertools |
|
|
|
import os |
|
|
|
import struct |
|
|
@ -275,13 +276,6 @@ class UTXOCache(object): |
|
|
|
|
|
|
|
class DB(object): |
|
|
|
|
|
|
|
HEIGHT_KEY = b'height' |
|
|
|
TIP_KEY = b'tip' |
|
|
|
GENESIS_KEY = b'genesis' |
|
|
|
TX_COUNT_KEY = b'tx_count' |
|
|
|
FLUSH_COUNT_KEY = b'flush_count' |
|
|
|
WALL_TIME_KEY = b'wall_time' |
|
|
|
|
|
|
|
class Error(Exception): |
|
|
|
pass |
|
|
|
|
|
|
@ -289,75 +283,90 @@ class DB(object): |
|
|
|
self.logger = logging.getLogger('DB') |
|
|
|
self.logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
self.coin = env.coin |
|
|
|
# Meta |
|
|
|
self.tx_hash_file_size = 16 * 1024 * 1024 |
|
|
|
self.flush_MB = env.flush_MB |
|
|
|
self.next_cache_check = 0 |
|
|
|
self.logger.info('flushing after cache reaches {:,d} MB' |
|
|
|
.format(self.flush_MB)) |
|
|
|
self.last_flush = time.time() |
|
|
|
self.coin = env.coin |
|
|
|
|
|
|
|
self.tx_counts = array.array('I') |
|
|
|
self.tx_hash_file_size = 4*1024*1024 |
|
|
|
# Unflushed items. Headers and tx_hashes have one entry per block |
|
|
|
# Chain state (initialize to genesis in case of new DB) |
|
|
|
self.height = -1 |
|
|
|
self.tx_count = 0 |
|
|
|
self.flush_count = 0 |
|
|
|
self.wall_time = 0 |
|
|
|
self.tip = self.coin.GENESIS_HASH |
|
|
|
|
|
|
|
# Open DB and metadata files. Record some of its state. |
|
|
|
self.db = self.open_db(self.coin) |
|
|
|
self.db_tx_count = self.tx_count |
|
|
|
self.db_height = self.height |
|
|
|
|
|
|
|
# Caches to be flushed later. Headers and tx_hashes have one |
|
|
|
# entry per block |
|
|
|
self.headers = [] |
|
|
|
self.tx_hashes = [] |
|
|
|
self.history = defaultdict(partial(array.array, 'I')) |
|
|
|
self.history_size = 0 |
|
|
|
|
|
|
|
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) |
|
|
|
try: |
|
|
|
self.db = self.open_db(db_name, False) |
|
|
|
except: |
|
|
|
self.db = self.open_db(db_name, True) |
|
|
|
self.headers_file = self.open_file('headers', True) |
|
|
|
self.txcount_file = self.open_file('txcount', True) |
|
|
|
self.init_db() |
|
|
|
self.logger.info('created new database {}'.format(db_name)) |
|
|
|
else: |
|
|
|
self.logger.info('successfully opened database {}'.format(db_name)) |
|
|
|
self.headers_file = self.open_file('headers') |
|
|
|
self.txcount_file = self.open_file('txcount') |
|
|
|
self.read_db() |
|
|
|
|
|
|
|
self.utxo_cache = UTXOCache(self, self.db, self.coin) |
|
|
|
self.tx_counts = array.array('I') |
|
|
|
self.txcount_file.seek(0) |
|
|
|
self.tx_counts.fromfile(self.txcount_file, self.height + 1) |
|
|
|
|
|
|
|
# Note that DB_HEIGHT is the height of the next block to be written. |
|
|
|
# So an empty DB has a DB_HEIGHT of 0 not -1. |
|
|
|
self.tx_count = self.db_tx_count |
|
|
|
self.height = self.db_height - 1 |
|
|
|
self.tx_counts.fromfile(self.txcount_file, self.db_height) |
|
|
|
self.last_flush = time.time() |
|
|
|
# FIXME: this sucks and causes issues with exceptions in init_db() |
|
|
|
if self.tx_count == 0: |
|
|
|
self.flush() |
|
|
|
|
|
|
|
def open_db(self, db_name, create): |
|
|
|
return plyvel.DB(db_name, create_if_missing=create, |
|
|
|
error_if_exists=create, compression=None) |
|
|
|
|
|
|
|
def init_db(self): |
|
|
|
self.db_height = 0 |
|
|
|
self.db_tx_count = 0 |
|
|
|
self.flush_count = 0 |
|
|
|
self.wall_time = 0 |
|
|
|
self.tip = self.coin.GENESIS_HASH |
|
|
|
self.db.put(self.GENESIS_KEY, unhexlify(self.tip)) |
|
|
|
|
|
|
|
def read_db(self): |
|
|
|
db = self.db |
|
|
|
genesis_hash = hexlify(db.get(self.GENESIS_KEY)) |
|
|
|
if genesis_hash != self.coin.GENESIS_HASH: |
|
|
|
raise self.Error('DB genesis hash {} does not match coin {}' |
|
|
|
.format(genesis_hash, self.coin.GENESIS_HASH)) |
|
|
|
self.db_height = from_4_bytes(db.get(self.HEIGHT_KEY)) |
|
|
|
self.db_tx_count = from_4_bytes(db.get(self.TX_COUNT_KEY)) |
|
|
|
self.flush_count = from_4_bytes(db.get(self.FLUSH_COUNT_KEY)) |
|
|
|
self.wall_time = from_4_bytes(db.get(self.WALL_TIME_KEY)) |
|
|
|
self.tip = hexlify(db.get(self.TIP_KEY)) |
|
|
|
# Log state |
|
|
|
self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' |
|
|
|
'flush count: {:,d} sync time: {}' |
|
|
|
.format(self.coin.NAME, self.coin.NET, |
|
|
|
self.db_height - 1, self.db_tx_count, |
|
|
|
self.height, self.tx_count, |
|
|
|
self.flush_count, self.formatted_wall_time())) |
|
|
|
self.logger.info('flushing after cache reaches {:,d} MB' |
|
|
|
.format(self.flush_MB)) |
|
|
|
|
|
|
|
def open_db(self, coin): |
|
|
|
self.headers_file = self.open_file('headers', True) |
|
|
|
self.txcount_file = self.open_file('txcount', True) |
|
|
|
is_new = self.headers_file.seek(0, 2) == 0 |
|
|
|
if is_new != (self.txcount_file.seek(0, 2) == 0): |
|
|
|
raise self.Error('just one metadata file is zero-length') |
|
|
|
|
|
|
|
db_name = '{}-{}'.format(coin.NAME, coin.NET) |
|
|
|
db = plyvel.DB(db_name, create_if_missing=is_new, |
|
|
|
error_if_exists=is_new, compression=None) |
|
|
|
if is_new: |
|
|
|
self.logger.info('created new database {}'.format(db_name)) |
|
|
|
self.flush_state(db) |
|
|
|
else: |
|
|
|
self.logger.info('successfully opened database {}'.format(db_name)) |
|
|
|
self.read_state(db) |
|
|
|
return db |
|
|
|
|
|
|
|
def flush_state(self, batch): |
|
|
|
'''Flush chain state to the batch.''' |
|
|
|
now = time.time() |
|
|
|
self.wall_time += now - self.last_flush |
|
|
|
self.last_flush = now |
|
|
|
state = { |
|
|
|
'genesis': self.coin.GENESIS_HASH, |
|
|
|
'height': self.height, |
|
|
|
'tx_count': self.tx_count, |
|
|
|
'tip': self.tip, |
|
|
|
'flush_count': self.flush_count, |
|
|
|
'wall_time': self.wall_time, |
|
|
|
} |
|
|
|
batch.put(b'state', repr(state).encode('ascii')) |
|
|
|
|
|
|
|
def read_state(self, db): |
|
|
|
state = db.get(b'state') |
|
|
|
state = ast.literal_eval(state.decode('ascii')) |
|
|
|
if state['genesis'] != self.coin.GENESIS_HASH: |
|
|
|
raise self.Error('DB genesis hash {} does not match coin {}' |
|
|
|
.format(state['genesis_hash'], |
|
|
|
self.coin.GENESIS_HASH)) |
|
|
|
self.height = state['height'] |
|
|
|
self.tx_count = state['tx_count'] |
|
|
|
self.tip = state['tip'] |
|
|
|
self.flush_count = state['flush_count'] |
|
|
|
self.wall_time = state['wall_time'] |
|
|
|
|
|
|
|
def formatted_wall_time(self): |
|
|
|
wall_time = int(self.wall_time) |
|
|
@ -365,12 +374,12 @@ class DB(object): |
|
|
|
wall_time // 86400, (wall_time % 86400) // 3600, |
|
|
|
(wall_time % 3600) // 60, wall_time % 60) |
|
|
|
|
|
|
|
def flush(self): |
|
|
|
def flush_all(self): |
|
|
|
'''Flush out all cached state.''' |
|
|
|
flush_start = time.time() |
|
|
|
last_flush = self.last_flush |
|
|
|
tx_diff = self.tx_count - self.db_tx_count |
|
|
|
height_diff = self.height + 1 - self.db_height |
|
|
|
height_diff = self.height - self.db_height |
|
|
|
self.logger.info('starting flush {:,d} txs and {:,d} blocks' |
|
|
|
.format(tx_diff, height_diff)) |
|
|
|
|
|
|
@ -386,9 +395,13 @@ class DB(object): |
|
|
|
self.flush_state(batch) |
|
|
|
self.logger.info('committing transaction...') |
|
|
|
|
|
|
|
# The flush succeeded, so update our record of DB state |
|
|
|
self.db_tx_count = self.tx_count |
|
|
|
self.db_height = self.height |
|
|
|
|
|
|
|
# Update and put the wall time again - otherwise we drop the |
|
|
|
# time it takes leveldb to commit the batch |
|
|
|
self.update_wall_time(self.db) |
|
|
|
# time it took leveldb to commit the batch |
|
|
|
self.flush_state(self.db) |
|
|
|
|
|
|
|
flush_time = int(self.last_flush - flush_start) |
|
|
|
self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} ' |
|
|
@ -410,27 +423,11 @@ class DB(object): |
|
|
|
self.write_tx_hashes() |
|
|
|
os.sync() |
|
|
|
|
|
|
|
def update_wall_time(self, dest): |
|
|
|
'''Put the wall time to dest - a DB or batch.''' |
|
|
|
now = time.time() |
|
|
|
self.wall_time += now - self.last_flush |
|
|
|
self.last_flush = now |
|
|
|
dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time))) |
|
|
|
|
|
|
|
def flush_state(self, batch): |
|
|
|
self.db_tx_count = self.tx_count |
|
|
|
self.db_height = self.height + 1 |
|
|
|
batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height)) |
|
|
|
batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count)) |
|
|
|
batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count)) |
|
|
|
batch.put(self.TIP_KEY, unhexlify(self.tip)) |
|
|
|
self.update_wall_time(batch) |
|
|
|
self.flush_count += 1 |
|
|
|
|
|
|
|
def flush_history(self, batch): |
|
|
|
# Drop any None entry |
|
|
|
self.history.pop(None, None) |
|
|
|
|
|
|
|
self.flush_count += 1 |
|
|
|
flush_id = struct.pack('>H', self.flush_count) |
|
|
|
for hash168, hist in self.history.items(): |
|
|
|
key = b'H' + hash168 + flush_id |
|
|
@ -442,9 +439,10 @@ class DB(object): |
|
|
|
self.history = defaultdict(partial(array.array, 'I')) |
|
|
|
self.history_size = 0 |
|
|
|
|
|
|
|
def open_file(self, filename, truncate=False, create=False): |
|
|
|
def open_file(self, filename, create=False): |
|
|
|
'''Open the file name. Return its handle.''' |
|
|
|
try: |
|
|
|
return open(filename, 'wb+' if truncate else 'rb+') |
|
|
|
return open(filename, 'rb+') |
|
|
|
except FileNotFoundError: |
|
|
|
if create: |
|
|
|
return open(filename, 'wb+') |
|
|
@ -459,14 +457,15 @@ class DB(object): |
|
|
|
headers = b''.join(self.headers) |
|
|
|
header_len = self.coin.HEADER_LEN |
|
|
|
assert len(headers) % header_len == 0 |
|
|
|
self.headers_file.seek(self.db_height * header_len) |
|
|
|
self.headers_file.seek((self.db_height + 1) * header_len) |
|
|
|
self.headers_file.write(headers) |
|
|
|
self.headers_file.flush() |
|
|
|
self.headers = [] |
|
|
|
|
|
|
|
def write_tx_counts(self): |
|
|
|
self.txcount_file.seek(self.db_height * self.tx_counts.itemsize) |
|
|
|
self.txcount_file.write(self.tx_counts[self.db_height: self.height + 1]) |
|
|
|
self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) |
|
|
|
self.txcount_file.write(self.tx_counts[self.db_height + 1: |
|
|
|
self.height + 1]) |
|
|
|
self.txcount_file.flush() |
|
|
|
|
|
|
|
def write_tx_hashes(self): |
|
|
@ -529,7 +528,7 @@ class DB(object): |
|
|
|
if now > self.next_cache_check: |
|
|
|
self.next_cache_check = now + 60 |
|
|
|
if self.cache_MB() > self.flush_MB: |
|
|
|
self.flush() |
|
|
|
self.flush_all() |
|
|
|
|
|
|
|
def process_tx(self, tx_hash, tx): |
|
|
|
cache = self.utxo_cache |
|
|
@ -552,13 +551,13 @@ class DB(object): |
|
|
|
height = bisect_right(self.tx_counts, tx_num) |
|
|
|
|
|
|
|
# Is this on disk or unflushed? |
|
|
|
if height >= self.db_height: |
|
|
|
tx_hashes = self.tx_hashes[height - self.db_height] |
|
|
|
if height > self.db_height: |
|
|
|
tx_hashes = self.tx_hashes[height - (self.db_height + 1)] |
|
|
|
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]] |
|
|
|
else: |
|
|
|
file_pos = tx_num * 32 |
|
|
|
file_num, offset = divmod(file_pos, self.tx_hash_file_size) |
|
|
|
filename = 'hashes{:05d}'.format(file_num) |
|
|
|
filename = 'hashes{:04d}'.format(file_num) |
|
|
|
with self.open_file(filename) as f: |
|
|
|
f.seek(offset) |
|
|
|
tx_hash = f.read(32) |
|
|
|