Browse Source

Merge branch 'serve_disk_only' into develop

master
Neil Booth 8 years ago
parent
commit
db5d0dd6cb
  1. 64
      server/block_processor.py
  2. 50
      server/db.py
  3. 9
      server/protocol.py

64
server/block_processor.py

@ -10,7 +10,6 @@
import array
import asyncio
import itertools
import os
from struct import pack, unpack
import time
@ -437,47 +436,11 @@ class BlockProcessor(server.db.DB):
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.fs_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
assert self.fs_height + len(self.headers) == self.height
assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
self.fs_update(self.fs_height, self.headers, self.tx_hashes)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.tx_hashes = []
@ -819,23 +782,6 @@ class BlockProcessor(server.db.DB):
self.db_height = self.height
self.db_tip = self.tip
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, max(0, self.fs_height + 1 - start))
result = self.fs_read_headers(start, disk_count)
count -= disk_count
start += disk_count
# The rest from memory
if count:
start -= self.fs_height + 1
if not (count >= 0 and start + count <= len(self.headers)):
raise ChainError('{:,d} headers starting at {:,d} not on disk'
.format(count, start))
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
tx_hash, tx_height = self.fs_tx_hash(tx_num)

50
server/db.py

@ -10,6 +10,7 @@
import array
import ast
import itertools
import os
from struct import pack, unpack
from bisect import bisect_right
@ -117,7 +118,6 @@ class DB(LoggedClass):
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
def write_state(self, batch):
'''Write chain state to the batch.'''
state = {
@ -142,7 +142,51 @@ class DB(LoggedClass):
return open(filename, 'wb+')
raise
def fs_read_headers(self, start, count):
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.
Their first height is fs_height. No recorded DB state is
updated. These arrays are all append only, so in a crash we
just pick up again from the DB height.
'''
blocks_done = len(self.headers)
new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
# First the headers
self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN)
self.headers_file.write(b''.join(headers))
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
def read_headers(self, start, count):
'''Requires count >= 0.'''
# Read some from disk
disk_count = min(count, self.db_height + 1 - start)
@ -172,7 +216,7 @@ class DB(LoggedClass):
return f.read(32), tx_height
def fs_block_hashes(self, height, count):
headers = self.fs_read_headers(height, count)
headers = self.read_headers(height, count)
# FIXME: move to coins.py
hlen = self.coin.HEADER_LEN
return [self.coin.header_hash(header)

9
server/protocol.py

@ -311,7 +311,8 @@ class ServerManager(util.LoggedClass):
for session in self.sessions:
if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON
session.messages.put_nowait((self.bp.height, touched, cache))
triple = (self.bp.db_height, touched, cache)
session.messages.put_nowait(triple)
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
@ -377,7 +378,7 @@ class ServerManager(util.LoggedClass):
async def rpc_getinfo(self, params):
'''The RPC 'getinfo' call.'''
return {
'blocks': self.bp.height,
'blocks': self.bp.db_height,
'peers': len(self.irc.peers),
'sessions': self.session_count(),
'watched': self.subscription_count,
@ -592,8 +593,8 @@ class ElectrumX(Session):
.format(self.peername(), len(matches)))
def height(self):
'''Return the block processor's current height.'''
return self.bp.height
'''Return the current flushed database height.'''
return self.bp.db_height
def current_electrum_header(self):
'''Used as response to a headers subscription request.'''

Loading…
Cancel
Save