Browse Source

Move fs_flush to db.py and merge with fs_update

patch-2
Neil Booth 7 years ago
parent
commit
11c6c919a6
  1. 20
      electrumx/server/block_processor.py
  2. 67
      electrumx/server/db.py

20
electrumx/server/block_processor.py

@ -344,11 +344,12 @@ class BlockProcessor(electrumx.server.db.DB):
tx_diff = self.tx_count - self.last_flush_tx_count tx_diff = self.tx_count - self.last_flush_tx_count
# Flush to file system # Flush to file system
self.fs_flush() self.fs_flush(self.height, self.tx_count, self.headers,
self.tx_hashes)
self.tx_hashes = []
self.headers = []
fs_end = time.time() fs_end = time.time()
if self.utxo_db.for_sync:
self.logger.info('flushed to FS in {:.1f}s'
.format(fs_end - flush_start))
# History next - it's fast and frees memory # History next - it's fast and frees memory
hashX_count = self.history.flush() hashX_count = self.history.flush()
@ -395,17 +396,6 @@ class BlockProcessor(electrumx.server.db.DB):
.format(formatted_time(self.wall_time), .format(formatted_time(self.wall_time),
formatted_time(tx_est / this_tx_per_sec))) formatted_time(tx_est / this_tx_per_sec)))
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
assert self.fs_height + len(self.headers) == self.height
assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
self.fs_update(self.fs_height, self.headers, self.tx_hashes)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.tx_hashes = []
self.headers = []
async def backup_flush(self): async def backup_flush(self):
assert self.height < self.db_height assert self.height < self.db_height
assert not self.headers assert not self.headers

67
electrumx/server/db.py

@ -150,6 +150,43 @@ class DB(object):
return await self.header_mc.branch_and_root(length, height) return await self.header_mc.branch_and_root(length, height)
# Flushing # Flushing
def fs_flush(self, to_height, to_tx_count, headers, block_tx_hashes):
'''Write headers, tx counts and block tx hashes to the filesystem.
No LevelDB state is updated.
The first height to write is self.fs_height + 1. The FS
metadata is all append-only, so in a crash we just pick up
again from the height stored in the DB.
'''
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(block_tx_hashes) == len(headers)
assert to_height == self.fs_height + len(headers)
assert to_tx_count == self.tx_counts[-1] if self.tx_counts else 0
assert len(self.tx_counts) == to_height + 1
hashes = b''.join(block_tx_hashes)
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == to_tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes
start_time = time.time()
height_start = self.fs_height + 1
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(headers))
self.fs_update_header_offsets(offset, height_start, headers)
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
self.fs_height = to_height
self.fs_tx_count = to_tx_count
if self.utxo_db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed to FS in {elapsed:.2f}s')
def db_assert_flushed(self, to_tx_count, to_height): def db_assert_flushed(self, to_tx_count, to_height):
'''Asserts state is fully flushed.''' '''Asserts state is fully flushed.'''
assert to_tx_count == self.fs_tx_count == self.db_tx_count assert to_tx_count == self.fs_tx_count == self.db_tx_count
@ -185,36 +222,6 @@ class DB(object):
# Truncate header_mc: header count is 1 more than the height. # Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1) self.header_mc.truncate(height + 1)
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.
Their first height is fs_height. No recorded DB state is
updated. These arrays are all append only, so in a crash we
just pick up again from the DB height.
'''
blocks_done = len(headers)
height_start = fs_height + 1
new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert len(block_tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
hashes = b''.join(block_tx_hashes)
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
# Write the headers, tx counts, and tx hashes
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(headers))
self.fs_update_header_offsets(offset, height_start, headers)
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
async def read_headers(self, start_height, count): async def read_headers(self, start_height, count):
'''Requires start_height >= 0, count >= 0. Reads as many headers as '''Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This are available starting at start_height up to count. This

Loading…
Cancel
Save