Browse Source

Merge branch 'clean_db' into develop

master
Neil Booth 8 years ago
parent
commit
a5fb9618d6
  1. 54
      server/block_processor.py
  2. 55
      server/db.py

54
server/block_processor.py

@ -10,7 +10,6 @@
import array import array
import asyncio import asyncio
import os
from struct import pack, unpack from struct import pack, unpack
import time import time
from bisect import bisect_left from bisect import bisect_left
@ -22,7 +21,6 @@ from server.version import VERSION
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.util import chunks, formatted_time, LoggedClass from lib.util import chunks, formatted_time, LoggedClass
import server.db import server.db
from server.storage import open_db
class ChainError(Exception): class ChainError(Exception):
@ -152,7 +150,6 @@ class BlockProcessor(server.db.DB):
self.utxo_MB = env.utxo_MB self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB self.hist_MB = env.hist_MB
self.next_cache_check = 0 self.next_cache_check = 0
self.reorg_limit = env.reorg_limit
# Headers and tx_hashes have one entry per block # Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
@ -171,14 +168,11 @@ class BlockProcessor(server.db.DB):
self.db_deletes = [] self.db_deletes = []
# Log state # Log state
self.logger.info('reorg limit is {:,d} blocks'
.format(self.reorg_limit))
if self.first_sync: if self.first_sync:
self.logger.info('flushing UTXO cache at {:,d} MB' self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB)) .format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB' self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB)) .format(self.hist_MB))
self.clean_db()
async def main_loop(self): async def main_loop(self):
'''Main loop for block processing.''' '''Main loop for block processing.'''
@ -295,52 +289,6 @@ class BlockProcessor(server.db.DB):
return self.fs_block_hashes(start, count) return self.fs_block_hashes(start, count)
def clean_db(self):
'''Clean out stale DB items.
Stale DB items are excess history flushed since the most
recent UTXO flush (only happens on unclean shutdown), and aged
undo information.
'''
if self.flush_count < self.utxo_flush_count:
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
with self.db.write_batch() as batch:
if self.flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
self.remove_excess_history(batch)
self.utxo_flush_count = self.flush_count
self.remove_stale_undo_items(batch)
self.flush_state(batch)
def remove_excess_history(self, batch):
prefix = b'H'
keys = []
for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def remove_stale_undo_items(self, batch):
prefix = b'U'
cutoff = self.db_height - self.reorg_limit
keys = []
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
self.logger.info('deleting {:,d} stale undo entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def flush_state(self, batch): def flush_state(self, batch):
'''Flush chain state to the batch.''' '''Flush chain state to the batch.'''
now = time.time() now = time.time()
@ -537,7 +485,7 @@ class BlockProcessor(server.db.DB):
self.tip = self.coin.header_hash(header) self.tip = self.coin.header_hash(header)
self.height += 1 self.height += 1
undo_info = self.advance_txs(tx_hashes, txs, touched) undo_info = self.advance_txs(tx_hashes, txs, touched)
if self.daemon.cached_height() - self.height <= self.reorg_limit: if self.daemon.cached_height() - self.height <= self.env.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info)) self.write_undo_info(self.height, b''.join(undo_info))
def advance_txs(self, tx_hashes, txs, touched): def advance_txs(self, tx_hashes, txs, touched):

55
server/db.py

@ -47,6 +47,8 @@ class DB(LoggedClass):
self.logger.info('switching current directory to {}' self.logger.info('switching current directory to {}'
.format(env.db_dir)) .format(env.db_dir))
os.chdir(env.db_dir) os.chdir(env.db_dir)
self.logger.info('reorg limit is {:,d} blocks'
.format(self.env.reorg_limit))
# Open DB and metadata files. Record some of its state. # Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
@ -73,6 +75,7 @@ class DB(LoggedClass):
assert self.db_tx_count == self.tx_counts[-1] assert self.db_tx_count == self.tx_counts[-1]
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
self.clean_db()
def read_state(self): def read_state(self):
if self.db.is_new: if self.db.is_new:
@ -117,6 +120,8 @@ class DB(LoggedClass):
if self.first_sync: if self.first_sync:
self.logger.info('sync time so far: {}' self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time))) .format(formatted_time(self.wall_time)))
if self.flush_count < self.utxo_flush_count:
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
def write_state(self, batch): def write_state(self, batch):
'''Write chain state to the batch.''' '''Write chain state to the batch.'''
@ -133,6 +138,56 @@ class DB(LoggedClass):
} }
batch.put(b'state', repr(state).encode()) batch.put(b'state', repr(state).encode())
def clean_db(self):
'''Clean out stale DB items.
Stale DB items are excess history flushed since the most
recent UTXO flush (only happens on unclean shutdown), and aged
undo information.
'''
if self.flush_count > self.utxo_flush_count:
self.utxo_flush_count = self.flush_count
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
history_keys = self.excess_history_keys()
self.logger.info('deleting {:,d} history entries'
.format(len(history_keys)))
else:
history_keys = []
undo_keys = self.stale_undo_keys()
if undo_keys:
self.logger.info('deleting {:,d} stale undo entries'
.format(len(undo_keys)))
with self.db.write_batch() as batch:
batch_delete = batch.delete
for key in history_keys:
batch_delete(key)
for key in undo_keys:
batch_delete(key)
self.write_state(batch)
def excess_history_keys(self):
prefix = b'H'
keys = []
for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
return keys
def stale_undo_keys(self):
prefix = b'U'
cutoff = self.db_height - self.env.reorg_limit
keys = []
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
return keys
def open_file(self, filename, create=False): def open_file(self, filename, create=False):
'''Open the file name. Return its handle.''' '''Open the file name. Return its handle.'''
try: try:

Loading…
Cancel
Save