Browse Source

Move bulk of FS flush code to db.py

More logically belongs there.
Have servers use the flushed DB height not the block processor's height.
Get all headers only from disk.
master
Neil Booth 8 years ago
parent
commit
59244cc24e
  1. 64
      server/block_processor.py
  2. 50
      server/db.py
  3. 9
      server/protocol.py

64
server/block_processor.py

@ -10,7 +10,6 @@
import array import array
import asyncio import asyncio
import itertools
import os import os
from struct import pack, unpack from struct import pack, unpack
import time import time
@ -438,47 +437,11 @@ class BlockProcessor(server.db.DB):
def fs_flush(self): def fs_flush(self):
'''Flush the things stored on the filesystem.''' '''Flush the things stored on the filesystem.'''
blocks_done = len(self.headers) assert self.fs_height + len(self.headers) == self.height
prior_tx_count = (self.tx_counts[self.fs_height] assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
if self.fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 self.fs_update(self.fs_height, self.headers, self.tx_hashes)
txs_done = cur_tx_count - prior_tx_count
assert self.fs_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.fs_height = self.height self.fs_height = self.height
self.fs_tx_count = self.tx_count self.fs_tx_count = self.tx_count
self.tx_hashes = [] self.tx_hashes = []
@ -824,23 +787,6 @@ class BlockProcessor(server.db.DB):
self.db_height = self.height self.db_height = self.height
self.db_tip = self.tip self.db_tip = self.tip
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, max(0, self.fs_height + 1 - start))
result = self.fs_read_headers(start, disk_count)
count -= disk_count
start += disk_count
# The rest from memory
if count:
start -= self.fs_height + 1
if not (count >= 0 and start + count <= len(self.headers)):
raise ChainError('{:,d} headers starting at {:,d} not on disk'
.format(count, start))
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num): def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.''' '''Returns the tx_hash and height of a tx number.'''
tx_hash, tx_height = self.fs_tx_hash(tx_num) tx_hash, tx_height = self.fs_tx_hash(tx_num)

50
server/db.py

@ -10,6 +10,7 @@
import array import array
import ast import ast
import itertools
import os import os
from struct import pack, unpack from struct import pack, unpack
from bisect import bisect_right from bisect import bisect_right
@ -117,7 +118,6 @@ class DB(LoggedClass):
self.logger.info('sync time so far: {}' self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time))) .format(formatted_time(self.wall_time)))
def write_state(self, batch): def write_state(self, batch):
'''Write chain state to the batch.''' '''Write chain state to the batch.'''
state = { state = {
@ -142,7 +142,51 @@ class DB(LoggedClass):
return open(filename, 'wb+') return open(filename, 'wb+')
raise raise
def fs_read_headers(self, start, count): def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.
Their first height is fs_height. No recorded DB state is
updated. These arrays are all append only, so in a crash we
just pick up again from the DB height.
'''
blocks_done = len(self.headers)
new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
# First the headers
self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN)
self.headers_file.write(b''.join(headers))
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
def read_headers(self, start, count):
'''Requires count >= 0.''' '''Requires count >= 0.'''
# Read some from disk # Read some from disk
disk_count = min(count, self.db_height + 1 - start) disk_count = min(count, self.db_height + 1 - start)
@ -172,7 +216,7 @@ class DB(LoggedClass):
return f.read(32), tx_height return f.read(32), tx_height
def fs_block_hashes(self, height, count): def fs_block_hashes(self, height, count):
headers = self.fs_read_headers(height, count) headers = self.read_headers(height, count)
# FIXME: move to coins.py # FIXME: move to coins.py
hlen = self.coin.HEADER_LEN hlen = self.coin.HEADER_LEN
return [self.coin.header_hash(header) return [self.coin.header_hash(header)

9
server/protocol.py

@ -311,7 +311,8 @@ class ServerManager(util.LoggedClass):
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON # Use a tuple to distinguish from JSON
session.messages.put_nowait((self.bp.height, touched, cache)) triple = (self.bp.db_height, touched, cache)
session.messages.put_nowait(triple)
async def shutdown(self): async def shutdown(self):
'''Call to shutdown the servers. Returns when done.''' '''Call to shutdown the servers. Returns when done.'''
@ -377,7 +378,7 @@ class ServerManager(util.LoggedClass):
async def rpc_getinfo(self, params): async def rpc_getinfo(self, params):
'''The RPC 'getinfo' call.''' '''The RPC 'getinfo' call.'''
return { return {
'blocks': self.bp.height, 'blocks': self.bp.db_height,
'peers': len(self.irc.peers), 'peers': len(self.irc.peers),
'sessions': self.session_count(), 'sessions': self.session_count(),
'watched': self.subscription_count, 'watched': self.subscription_count,
@ -592,8 +593,8 @@ class ElectrumX(Session):
.format(self.peername(), len(matches))) .format(self.peername(), len(matches)))
def height(self): def height(self):
'''Return the block processor's current height.''' '''Return the current flushed database height.'''
return self.bp.height return self.bp.db_height
def current_electrum_header(self): def current_electrum_header(self):
'''Used as response to a headers subscription request.''' '''Used as response to a headers subscription request.'''

Loading…
Cancel
Save