Browse Source

Merge branch 'release-0.7.11'

master 0.7.11
Neil Booth 8 years ago
parent
commit
812509ab08
  1. 16
      RELEASE-NOTES
  2. 28
      docs/ENV-NOTES
  3. 168
      server/block_processor.py
  4. 117
      server/db.py
  5. 2
      server/env.py
  6. 2
      server/irc.py
  7. 11
      server/protocol.py
  8. 2
      server/storage.py
  9. 2
      server/version.py

16
RELEASE-NOTES

@ -1,3 +1,19 @@
version 0.7.11
--------------
- increased MAX_SEND default value to 1 million bytes so as to be able
to serve large historical transactions of up to ~500K in size. The
MAX_SEND floor remains at 350,000 bytes so you can reduct it if you
wish. To serve any historical transaction for bitcoin youd should
set this to around 2,000,100 bytes (one byte becomes 2 ASCII hex chars)
- issue #46: fix reorgs for coinbase-only blocks. We would not distinguish
undo information being empty from it not existing
- issue #47: IRC reconnects. I don't get this issue so cannot be certain
it is resolved
- $VERSION in your banner file is now replaced with your ElectrumX version
- more work to isolate the DB from the block processor, working towards the
goal of asynchronous block updates
version 0.7.10
--------------

28
docs/ENV-NOTES

@ -1,7 +1,7 @@
The following environment variables are required:
DB_DIRECTORY - path to the database directory (if relative, to `run` script)
USERNAME - the username the server will run as if using `run` script
USERNAME - the username the server will run as, if using `run` script
ELECTRUMX - path to the electrumx_server.py script (if relative,
to `run` script)
DAEMON_URL - A comma-separated list of daemon URLS. If more than one is
@ -33,7 +33,9 @@ SSL_PORT - if set will serve Electrum SSL clients on that HOST:SSL_PORT
RPC_PORT - Listen on this port for local RPC connections, defaults to
8000.
BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client.
is re-read for each new client. The string $VERSION in your
banner file will be replaced with the ElectrumX version you
are runnning, such as 'ElectrumX 0.7.11'.
ANON_LOGS - set to anything non-empty to remove IP addresses from
logs. By default IP addresses will be logged.
DONATION_ADDRESS - server donation address. Defaults to none.
@ -45,20 +47,22 @@ each and are processed efficiently. I feel the defaults are low and
encourage you to raise them.
MAX_SEND - maximum size of a response message to send over the wire,
in bytes. Defaults to 350,000 and will treat smaller
values as the same because standard Electrum protocol
header chunk requests are nearly that large.
in bytes. Defaults to 1,000,000 and will treat values
smaller than 350,000 as 350,000 because standard Electrum
protocol header chunk requests are almost that large.
The Electrum protocol has a flaw in that address
histories must be served all at once or not at all,
an obvious avenue for abuse. MAX_SEND is a
stop-gap until the protocol is improved to admit
incremental history requests. Each history entry
is appoximately 100 bytes so the default is
equivalent to a history limit of around 3,500
equivalent to a history limit of around 10,000
entries, which should be ample for most legitimate
users. Increasing by a single-digit factor is
likely fine but bear in mind one client can request
history for multiple addresses.
users. If you increase this bear in mind one client
can request history for multiple addresses. Also,
the largest raw transaction you will be able to serve
to a client is just under half of MAX_SEND, as each raw
byte becomes 2 hexadecimal ASCII characters on the wire.
MAX_SUBS - maximum number of address subscriptions across all
sessions. Defaults to 250,000.
MAX_SESSION_SUBS - maximum number of address subscriptions permitted to a
@ -101,5 +105,7 @@ UTXO_MB - amount of UTXO and history cache, in MB, to retain before
The following are for debugging purposes.
FORCE_REORG - if set to a positive integer, it will simulate a reorg
of the blockchain for that number of blocks. Do not set
to a value greater than REORG_LIMIT.
of the blockchain for that number of blocks on startup.
Although it should fail gracefully if set to a value
greater than REORG_LIMIT, I do not recommend it as I have
not tried it and there is a chance your DB might corrupt.

168
server/block_processor.py

@ -10,8 +10,6 @@
import array
import asyncio
import itertools
import os
from struct import pack, unpack
import time
from bisect import bisect_left
@ -23,7 +21,6 @@ from server.version import VERSION
from lib.hash import hash_to_str
from lib.util import chunks, formatted_time, LoggedClass
import server.db
from server.storage import open_db
class ChainError(Exception):
@ -148,13 +145,11 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self.event = asyncio.Event()
self.touched = set()
# Meta
self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.reorg_limit = env.reorg_limit
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
@ -173,14 +168,11 @@ class BlockProcessor(server.db.DB):
self.db_deletes = []
# Log state
self.logger.info('reorg limit is {:,d} blocks'
.format(self.reorg_limit))
if self.first_sync:
self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
self.clean_db()
async def main_loop(self):
'''Main loop for block processing.'''
@ -189,7 +181,7 @@ class BlockProcessor(server.db.DB):
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg)
await self.handle_chain_reorg(self.env.force_reorg, set())
while True:
await self._wait_for_update()
@ -215,22 +207,22 @@ class BlockProcessor(server.db.DB):
if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
touched = set()
try:
for block in blocks:
self.advance_block(block, self.caught_up)
self.advance_block(block, touched)
await asyncio.sleep(0) # Yield
except ChainReorg:
await self.handle_chain_reorg(None)
await self.handle_chain_reorg(None, touched)
if self.caught_up:
# Flush everything as queries are performed on the DB and
# not in-memory.
self.flush(True)
self.client.notify(self.touched)
self.client.notify(touched)
elif time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60
self.touched = set()
def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.'''
@ -242,7 +234,7 @@ class BlockProcessor(server.db.DB):
self.flush(True)
self.event.set()
async def handle_chain_reorg(self, count):
async def handle_chain_reorg(self, count, touched):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
@ -256,7 +248,7 @@ class BlockProcessor(server.db.DB):
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks)
self.backup_blocks(blocks, touched)
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
@ -291,58 +283,12 @@ class BlockProcessor(server.db.DB):
else:
start = (self.height - count) + 1
self.logger.info('chain was reorganised for {:,d} blocks over '
'heights {:,d}-{:,d} inclusive'
self.logger.info('chain was reorganised: {:,d} blocks at '
'heights {:,d}-{:,d} were replaced'
.format(count, start, start + count - 1))
return self.fs_block_hashes(start, count)
def clean_db(self):
'''Clean out stale DB items.
Stale DB items are excess history flushed since the most
recent UTXO flush (only happens on unclean shutdown), and aged
undo information.
'''
if self.flush_count < self.utxo_flush_count:
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
with self.db.write_batch() as batch:
if self.flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
self.remove_excess_history(batch)
self.utxo_flush_count = self.flush_count
self.remove_stale_undo_items(batch)
self.flush_state(batch)
def remove_excess_history(self, batch):
prefix = b'H'
keys = []
for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def remove_stale_undo_items(self, batch):
prefix = b'U'
cutoff = self.db_height - self.reorg_limit
keys = []
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
self.logger.info('deleting {:,d} stale undo entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
@ -438,47 +384,11 @@ class BlockProcessor(server.db.DB):
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.fs_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
assert self.fs_height + len(self.headers) == self.height
assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0
self.fs_update(self.fs_height, self.headers, self.tx_hashes)
self.fs_height = self.height
self.fs_tx_count = self.tx_count
self.tx_hashes = []
@ -542,18 +452,6 @@ class BlockProcessor(server.db.DB):
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(utxo_MB >= self.utxo_MB)
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + pack('>I', height)
def write_undo_info(self, height, undo_info):
'''Write out undo information for the current height.'''
self.db.put(self.undo_key(height), undo_info)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def fs_advance_block(self, header, tx_hashes, txs):
'''Update unflushed FS state for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
@ -563,7 +461,7 @@ class BlockProcessor(server.db.DB):
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def advance_block(self, block, update_touched):
def advance_block(self, block, touched):
# We must update the FS cache before calling advance_txs() as
# the UTXO cache uses the FS cache via get_tx_hash() to
# resolve compressed key collisions
@ -571,17 +469,13 @@ class BlockProcessor(server.db.DB):
if self.tip != self.coin.header_prevhash(header):
raise ChainReorg
touched = set()
self.fs_advance_block(header, tx_hashes, txs)
self.tip = self.coin.header_hash(header)
self.height += 1
undo_info = self.advance_txs(tx_hashes, txs, touched)
if self.daemon.cached_height() - self.height <= self.reorg_limit:
if self.daemon.cached_height() - self.height <= self.env.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info))
if update_touched:
self.touched.update(touched)
def advance_txs(self, tx_hashes, txs, touched):
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
@ -623,7 +517,7 @@ class BlockProcessor(server.db.DB):
return undo_info
def backup_blocks(self, blocks):
def backup_blocks(self, blocks, touched):
'''Backup the blocks and flush.
The blocks should be in order of decreasing height.
@ -632,7 +526,6 @@ class BlockProcessor(server.db.DB):
self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed()
touched = set()
for block in blocks:
header, tx_hashes, txs = self.coin.read_block(block)
header_hash = self.coin.header_hash(header)
@ -649,7 +542,8 @@ class BlockProcessor(server.db.DB):
self.logger.info('backed up to height {:,d}'.format(self.height))
self.touched.update(touched)
# touched includes those passed into this function. That will
# generally be empty but is harmless if not.
flush_history = partial(self.backup_history, hash168s=touched)
self.flush(True, flush_history=flush_history)
@ -657,7 +551,7 @@ class BlockProcessor(server.db.DB):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
if not undo_info:
if undo_info is None:
raise ChainError('no undo information found for height {:,d}'
.format(self.height))
n = len(undo_info)
@ -794,6 +688,7 @@ class BlockProcessor(server.db.DB):
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
flush_start = time.time()
delete_count = len(self.db_deletes) // 2
batch_delete = batch.delete
for key in self.db_deletes:
@ -813,34 +708,15 @@ class BlockProcessor(server.db.DB):
'adds, {:,d} spends in {:.1f}s, committing...'
.format(self.height - self.db_height,
self.tx_count - self.db_tx_count,
len(self.utxo_cache),
len(self.db_deletes) // 2,
len(self.utxo_cache), delete_count,
time.time() - flush_start))
self.utxo_cache = {}
self.db_deletes = []
self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, max(0, self.fs_height + 1 - start))
result = self.fs_read_headers(start, disk_count)
count -= disk_count
start += disk_count
# The rest from memory
if count:
start -= self.fs_height + 1
if not (count >= 0 and start + count <= len(self.headers)):
raise ChainError('{:,d} headers starting at {:,d} not on disk'
.format(count, start))
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
tx_hash, tx_height = self.fs_tx_hash(tx_num)

117
server/db.py

@ -10,6 +10,7 @@
import array
import ast
import itertools
import os
from struct import pack, unpack
from bisect import bisect_right
@ -46,6 +47,8 @@ class DB(LoggedClass):
self.logger.info('switching current directory to {}'
.format(env.db_dir))
os.chdir(env.db_dir)
self.logger.info('reorg limit is {:,d} blocks'
.format(self.env.reorg_limit))
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
@ -72,6 +75,7 @@ class DB(LoggedClass):
assert self.db_tx_count == self.tx_counts[-1]
else:
assert self.db_tx_count == 0
self.clean_db()
def read_state(self):
if self.db.is_new:
@ -116,7 +120,8 @@ class DB(LoggedClass):
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
if self.flush_count < self.utxo_flush_count:
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
def write_state(self, batch):
'''Write chain state to the batch.'''
@ -133,6 +138,68 @@ class DB(LoggedClass):
}
batch.put(b'state', repr(state).encode())
def clean_db(self):
'''Clean out stale DB items.
Stale DB items are excess history flushed since the most
recent UTXO flush (only happens on unclean shutdown), and aged
undo information.
'''
if self.flush_count > self.utxo_flush_count:
self.utxo_flush_count = self.flush_count
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
history_keys = self.excess_history_keys()
self.logger.info('deleting {:,d} history entries'
.format(len(history_keys)))
else:
history_keys = []
undo_keys = self.stale_undo_keys()
if undo_keys:
self.logger.info('deleting {:,d} stale undo entries'
.format(len(undo_keys)))
with self.db.write_batch() as batch:
batch_delete = batch.delete
for key in history_keys:
batch_delete(key)
for key in undo_keys:
batch_delete(key)
self.write_state(batch)
def excess_history_keys(self):
prefix = b'H'
keys = []
for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
return keys
def stale_undo_keys(self):
prefix = b'U'
cutoff = self.db_height - self.env.reorg_limit
keys = []
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
return keys
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + pack('>I', height)
def write_undo_info(self, height, undo_info):
'''Write out undo information for the current height.'''
self.db.put(self.undo_key(height), undo_info)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
@ -142,7 +209,51 @@ class DB(LoggedClass):
return open(filename, 'wb+')
raise
def fs_read_headers(self, start, count):
def fs_update(self, fs_height, headers, block_tx_hashes):
'''Write headers, the tx_count array and block tx hashes to disk.
Their first height is fs_height. No recorded DB state is
updated. These arrays are all append only, so in a crash we
just pick up again from the DB height.
'''
blocks_done = len(self.headers)
new_height = fs_height + blocks_done
prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
# First the headers
self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN)
self.headers_file.write(b''.join(headers))
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
def read_headers(self, start, count):
'''Requires count >= 0.'''
# Read some from disk
disk_count = min(count, self.db_height + 1 - start)
@ -172,7 +283,7 @@ class DB(LoggedClass):
return f.read(32), tx_height
def fs_block_hashes(self, height, count):
headers = self.fs_read_headers(height, count)
headers = self.read_headers(height, count)
# FIXME: move to coins.py
hlen = self.coin.HEADER_LEN
return [self.coin.header_hash(header)

2
server/env.py

@ -45,7 +45,7 @@ class Env(LoggedClass):
self.donation_address = self.default('DONATION_ADDRESS', '')
self.db_engine = self.default('DB_ENGINE', 'leveldb')
# Server limits to help prevent DoS
self.max_send = self.integer('MAX_SEND', 250000)
self.max_send = self.integer('MAX_SEND', 1000000)
self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
# IRC

2
server/irc.py

@ -80,9 +80,9 @@ class IRC(LoggedClass):
'namreply', 'disconnect']:
reactor.add_global_handler(event, getattr(self, 'on_' + event))
connection = reactor.server()
while True:
try:
connection = reactor.server()
connection.connect(self.irc_server, self.irc_port,
self.nick, ircname=self.real_name)
connection.set_keepalive(60)

11
server/protocol.py

@ -311,7 +311,8 @@ class ServerManager(util.LoggedClass):
for session in self.sessions:
if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON
session.messages.put_nowait((self.bp.height, touched, cache))
triple = (self.bp.db_height, touched, cache)
session.messages.put_nowait(triple)
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
@ -377,7 +378,7 @@ class ServerManager(util.LoggedClass):
async def rpc_getinfo(self, params):
'''The RPC 'getinfo' call.'''
return {
'blocks': self.bp.height,
'blocks': self.bp.db_height,
'peers': len(self.irc.peers),
'sessions': self.session_count(),
'watched': self.subscription_count,
@ -592,8 +593,8 @@ class ElectrumX(Session):
.format(self.peername(), len(matches)))
def height(self):
'''Return the block processor's current height.'''
return self.bp.height
'''Return the current flushed database height.'''
return self.bp.db_height
def current_electrum_header(self):
'''Used as response to a headers subscription request.'''
@ -836,6 +837,8 @@ class ElectrumX(Session):
except Exception as e:
self.logger.error('reading banner file {}: {}'
.format(self.env.banner_file, e))
else:
banner = banner.replace('$VERSION', VERSION)
return banner
async def donation_address(self, params):

2
server/storage.py

@ -77,7 +77,7 @@ class LevelDB(Storage):
def open(self, name, create):
self.db = self.module.DB(name, create_if_missing=create,
compression=None)
max_open_files=256, compression=None)
self.get = self.db.get
self.put = self.db.put
self.iterator = self.db.iterator

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.7.10"
VERSION = "ElectrumX 0.7.11"

Loading…
Cancel
Save