diff --git a/RELEASE-NOTES b/RELEASE-NOTES index b1ff9c0..d283fc3 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,19 @@ +version 0.7.11 +-------------- + +- increased MAX_SEND default value to 1 million bytes so as to be able + to serve large historical transactions of up to ~500K in size. The + MAX_SEND floor remains at 350,000 bytes so you can reduct it if you + wish. To serve any historical transaction for bitcoin youd should + set this to around 2,000,100 bytes (one byte becomes 2 ASCII hex chars) +- issue #46: fix reorgs for coinbase-only blocks. We would not distinguish + undo information being empty from it not existing +- issue #47: IRC reconnects. I don't get this issue so cannot be certain + it is resolved +- $VERSION in your banner file is now replaced with your ElectrumX version +- more work to isolate the DB from the block processor, working towards the + goal of asynchronous block updates + version 0.7.10 -------------- diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index c12bb11..ee70098 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -1,7 +1,7 @@ The following environment variables are required: DB_DIRECTORY - path to the database directory (if relative, to `run` script) -USERNAME - the username the server will run as if using `run` script +USERNAME - the username the server will run as, if using `run` script ELECTRUMX - path to the electrumx_server.py script (if relative, to `run` script) DAEMON_URL - A comma-separated list of daemon URLS. If more than one is @@ -33,7 +33,9 @@ SSL_PORT - if set will serve Electrum SSL clients on that HOST:SSL_PORT RPC_PORT - Listen on this port for local RPC connections, defaults to 8000. BANNER_FILE - a path to a banner file to serve to clients. The banner file - is re-read for each new client. + is re-read for each new client. The string $VERSION in your + banner file will be replaced with the ElectrumX version you + are runnning, such as 'ElectrumX 0.7.11'. ANON_LOGS - set to anything non-empty to remove IP addresses from logs. By default IP addresses will be logged. DONATION_ADDRESS - server donation address. Defaults to none. @@ -45,20 +47,22 @@ each and are processed efficiently. I feel the defaults are low and encourage you to raise them. MAX_SEND - maximum size of a response message to send over the wire, - in bytes. Defaults to 350,000 and will treat smaller - values as the same because standard Electrum protocol - header chunk requests are nearly that large. + in bytes. Defaults to 1,000,000 and will treat values + smaller than 350,000 as 350,000 because standard Electrum + protocol header chunk requests are almost that large. The Electrum protocol has a flaw in that address histories must be served all at once or not at all, an obvious avenue for abuse. MAX_SEND is a stop-gap until the protocol is improved to admit incremental history requests. Each history entry is appoximately 100 bytes so the default is - equivalent to a history limit of around 3,500 + equivalent to a history limit of around 10,000 entries, which should be ample for most legitimate - users. Increasing by a single-digit factor is - likely fine but bear in mind one client can request - history for multiple addresses. + users. If you increase this bear in mind one client + can request history for multiple addresses. Also, + the largest raw transaction you will be able to serve + to a client is just under half of MAX_SEND, as each raw + byte becomes 2 hexadecimal ASCII characters on the wire. MAX_SUBS - maximum number of address subscriptions across all sessions. Defaults to 250,000. MAX_SESSION_SUBS - maximum number of address subscriptions permitted to a @@ -101,5 +105,7 @@ UTXO_MB - amount of UTXO and history cache, in MB, to retain before The following are for debugging purposes. FORCE_REORG - if set to a positive integer, it will simulate a reorg - of the blockchain for that number of blocks. Do not set - to a value greater than REORG_LIMIT. \ No newline at end of file + of the blockchain for that number of blocks on startup. + Although it should fail gracefully if set to a value + greater than REORG_LIMIT, I do not recommend it as I have + not tried it and there is a chance your DB might corrupt. \ No newline at end of file diff --git a/server/block_processor.py b/server/block_processor.py index 583dad4..a1619cb 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -10,8 +10,6 @@ import array import asyncio -import itertools -import os from struct import pack, unpack import time from bisect import bisect_left @@ -23,7 +21,6 @@ from server.version import VERSION from lib.hash import hash_to_str from lib.util import chunks, formatted_time, LoggedClass import server.db -from server.storage import open_db class ChainError(Exception): @@ -148,13 +145,11 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False self.event = asyncio.Event() - self.touched = set() # Meta self.utxo_MB = env.utxo_MB self.hist_MB = env.hist_MB self.next_cache_check = 0 - self.reorg_limit = env.reorg_limit # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) @@ -173,14 +168,11 @@ class BlockProcessor(server.db.DB): self.db_deletes = [] # Log state - self.logger.info('reorg limit is {:,d} blocks' - .format(self.reorg_limit)) if self.first_sync: self.logger.info('flushing UTXO cache at {:,d} MB' .format(self.utxo_MB)) self.logger.info('flushing history cache at {:,d} MB' .format(self.hist_MB)) - self.clean_db() async def main_loop(self): '''Main loop for block processing.''' @@ -189,7 +181,7 @@ class BlockProcessor(server.db.DB): if self.env.force_reorg > 0: self.logger.info('DEBUG: simulating reorg of {:,d} blocks' .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg) + await self.handle_chain_reorg(self.env.force_reorg, set()) while True: await self._wait_for_update() @@ -215,22 +207,22 @@ class BlockProcessor(server.db.DB): if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) + touched = set() try: for block in blocks: - self.advance_block(block, self.caught_up) + self.advance_block(block, touched) await asyncio.sleep(0) # Yield except ChainReorg: - await self.handle_chain_reorg(None) + await self.handle_chain_reorg(None, touched) if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. self.flush(True) - self.client.notify(self.touched) + self.client.notify(touched) elif time.time() > self.next_cache_check: self.check_cache_size() self.next_cache_check = time.time() + 60 - self.touched = set() def first_caught_up(self): '''Called when first caught up after start, or after a reorg.''' @@ -242,7 +234,7 @@ class BlockProcessor(server.db.DB): self.flush(True) self.event.set() - async def handle_chain_reorg(self, count): + async def handle_chain_reorg(self, count, touched): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for @@ -256,7 +248,7 @@ class BlockProcessor(server.db.DB): hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): blocks = await self.daemon.raw_blocks(hex_hashes) - self.backup_blocks(blocks) + self.backup_blocks(blocks, touched) await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') @@ -291,58 +283,12 @@ class BlockProcessor(server.db.DB): else: start = (self.height - count) + 1 - self.logger.info('chain was reorganised for {:,d} blocks over ' - 'heights {:,d}-{:,d} inclusive' + self.logger.info('chain was reorganised: {:,d} blocks at ' + 'heights {:,d}-{:,d} were replaced' .format(count, start, start + count - 1)) return self.fs_block_hashes(start, count) - def clean_db(self): - '''Clean out stale DB items. - - Stale DB items are excess history flushed since the most - recent UTXO flush (only happens on unclean shutdown), and aged - undo information. - ''' - if self.flush_count < self.utxo_flush_count: - raise ChainError('DB corrupt: flush_count < utxo_flush_count') - with self.db.write_batch() as batch: - if self.flush_count > self.utxo_flush_count: - self.logger.info('DB shut down uncleanly. Scanning for ' - 'excess history flushes...') - self.remove_excess_history(batch) - self.utxo_flush_count = self.flush_count - self.remove_stale_undo_items(batch) - self.flush_state(batch) - - def remove_excess_history(self, batch): - prefix = b'H' - keys = [] - for key, hist in self.db.iterator(prefix=prefix): - flush_id, = unpack('>H', key[-2:]) - if flush_id > self.utxo_flush_count: - keys.append(key) - - self.logger.info('deleting {:,d} history entries' - .format(len(keys))) - for key in keys: - batch.delete(key) - - def remove_stale_undo_items(self, batch): - prefix = b'U' - cutoff = self.db_height - self.reorg_limit - keys = [] - for key, hist in self.db.iterator(prefix=prefix): - height, = unpack('>I', key[-4:]) - if height > cutoff: - break - keys.append(key) - - self.logger.info('deleting {:,d} stale undo entries' - .format(len(keys))) - for key in keys: - batch.delete(key) - def flush_state(self, batch): '''Flush chain state to the batch.''' now = time.time() @@ -438,47 +384,11 @@ class BlockProcessor(server.db.DB): def fs_flush(self): '''Flush the things stored on the filesystem.''' - blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.fs_height] - if self.fs_height >= 0 else 0) - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert self.fs_height + blocks_done == self.height - assert len(self.tx_hashes) == blocks_done - assert len(self.tx_counts) == self.height + 1 - assert cur_tx_count == self.tx_count, \ - 'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count) - - # First the headers - headers = b''.join(self.headers) - header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.fs_height + 1) * header_len) - self.headers_file.write(headers) - self.headers_file.flush() - - # Then the tx counts - self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - cursor = 0 - file_pos = prior_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size - - os.sync() + assert self.fs_height + len(self.headers) == self.height + assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 + + self.fs_update(self.fs_height, self.headers, self.tx_hashes) + self.fs_height = self.height self.fs_tx_count = self.tx_count self.tx_hashes = [] @@ -542,18 +452,6 @@ class BlockProcessor(server.db.DB): if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: self.flush(utxo_MB >= self.utxo_MB) - def undo_key(self, height): - '''DB key for undo information at the given height.''' - return b'U' + pack('>I', height) - - def write_undo_info(self, height, undo_info): - '''Write out undo information for the current height.''' - self.db.put(self.undo_key(height), undo_info) - - def read_undo_info(self, height): - '''Read undo information from a file for the current height.''' - return self.db.get(self.undo_key(height)) - def fs_advance_block(self, header, tx_hashes, txs): '''Update unflushed FS state for a new block.''' prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0 @@ -563,7 +461,7 @@ class BlockProcessor(server.db.DB): self.tx_hashes.append(tx_hashes) self.tx_counts.append(prior_tx_count + len(txs)) - def advance_block(self, block, update_touched): + def advance_block(self, block, touched): # We must update the FS cache before calling advance_txs() as # the UTXO cache uses the FS cache via get_tx_hash() to # resolve compressed key collisions @@ -571,17 +469,13 @@ class BlockProcessor(server.db.DB): if self.tip != self.coin.header_prevhash(header): raise ChainReorg - touched = set() self.fs_advance_block(header, tx_hashes, txs) self.tip = self.coin.header_hash(header) self.height += 1 undo_info = self.advance_txs(tx_hashes, txs, touched) - if self.daemon.cached_height() - self.height <= self.reorg_limit: + if self.daemon.cached_height() - self.height <= self.env.reorg_limit: self.write_undo_info(self.height, b''.join(undo_info)) - if update_touched: - self.touched.update(touched) - def advance_txs(self, tx_hashes, txs, touched): put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo @@ -623,7 +517,7 @@ class BlockProcessor(server.db.DB): return undo_info - def backup_blocks(self, blocks): + def backup_blocks(self, blocks, touched): '''Backup the blocks and flush. The blocks should be in order of decreasing height. @@ -632,7 +526,6 @@ class BlockProcessor(server.db.DB): self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.assert_flushed() - touched = set() for block in blocks: header, tx_hashes, txs = self.coin.read_block(block) header_hash = self.coin.header_hash(header) @@ -649,7 +542,8 @@ class BlockProcessor(server.db.DB): self.logger.info('backed up to height {:,d}'.format(self.height)) - self.touched.update(touched) + # touched includes those passed into this function. That will + # generally be empty but is harmless if not. flush_history = partial(self.backup_history, hash168s=touched) self.flush(True, flush_history=flush_history) @@ -657,7 +551,7 @@ class BlockProcessor(server.db.DB): # Prevout values, in order down the block (coinbase first if present) # undo_info is in reverse block order undo_info = self.read_undo_info(self.height) - if not undo_info: + if undo_info is None: raise ChainError('no undo information found for height {:,d}' .format(self.height)) n = len(undo_info) @@ -794,6 +688,7 @@ class BlockProcessor(server.db.DB): # UTXO state may have keys in common with our write cache or # may be in the DB already. flush_start = time.time() + delete_count = len(self.db_deletes) // 2 batch_delete = batch.delete for key in self.db_deletes: @@ -813,34 +708,15 @@ class BlockProcessor(server.db.DB): 'adds, {:,d} spends in {:.1f}s, committing...' .format(self.height - self.db_height, self.tx_count - self.db_tx_count, - len(self.utxo_cache), - len(self.db_deletes) // 2, + len(self.utxo_cache), delete_count, time.time() - flush_start)) self.utxo_cache = {} - self.db_deletes = [] self.utxo_flush_count = self.flush_count self.db_tx_count = self.tx_count self.db_height = self.height self.db_tip = self.tip - def read_headers(self, start, count): - # Read some from disk - disk_count = min(count, max(0, self.fs_height + 1 - start)) - result = self.fs_read_headers(start, disk_count) - count -= disk_count - start += disk_count - - # The rest from memory - if count: - start -= self.fs_height + 1 - if not (count >= 0 and start + count <= len(self.headers)): - raise ChainError('{:,d} headers starting at {:,d} not on disk' - .format(count, start)) - result += b''.join(self.headers[start: start + count]) - - return result - def get_tx_hash(self, tx_num): '''Returns the tx_hash and height of a tx number.''' tx_hash, tx_height = self.fs_tx_hash(tx_num) diff --git a/server/db.py b/server/db.py index 609537e..f6672bc 100644 --- a/server/db.py +++ b/server/db.py @@ -10,6 +10,7 @@ import array import ast +import itertools import os from struct import pack, unpack from bisect import bisect_right @@ -46,6 +47,8 @@ class DB(LoggedClass): self.logger.info('switching current directory to {}' .format(env.db_dir)) os.chdir(env.db_dir) + self.logger.info('reorg limit is {:,d} blocks' + .format(self.env.reorg_limit)) # Open DB and metadata files. Record some of its state. db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) @@ -72,6 +75,7 @@ class DB(LoggedClass): assert self.db_tx_count == self.tx_counts[-1] else: assert self.db_tx_count == 0 + self.clean_db() def read_state(self): if self.db.is_new: @@ -116,7 +120,8 @@ class DB(LoggedClass): if self.first_sync: self.logger.info('sync time so far: {}' .format(formatted_time(self.wall_time))) - + if self.flush_count < self.utxo_flush_count: + raise self.DBError('DB corrupt: flush_count < utxo_flush_count') def write_state(self, batch): '''Write chain state to the batch.''' @@ -133,6 +138,68 @@ class DB(LoggedClass): } batch.put(b'state', repr(state).encode()) + def clean_db(self): + '''Clean out stale DB items. + + Stale DB items are excess history flushed since the most + recent UTXO flush (only happens on unclean shutdown), and aged + undo information. + ''' + if self.flush_count > self.utxo_flush_count: + self.utxo_flush_count = self.flush_count + self.logger.info('DB shut down uncleanly. Scanning for ' + 'excess history flushes...') + history_keys = self.excess_history_keys() + self.logger.info('deleting {:,d} history entries' + .format(len(history_keys))) + else: + history_keys = [] + + undo_keys = self.stale_undo_keys() + if undo_keys: + self.logger.info('deleting {:,d} stale undo entries' + .format(len(undo_keys))) + + with self.db.write_batch() as batch: + batch_delete = batch.delete + for key in history_keys: + batch_delete(key) + for key in undo_keys: + batch_delete(key) + self.write_state(batch) + + def excess_history_keys(self): + prefix = b'H' + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + flush_id, = unpack('>H', key[-2:]) + if flush_id > self.utxo_flush_count: + keys.append(key) + return keys + + def stale_undo_keys(self): + prefix = b'U' + cutoff = self.db_height - self.env.reorg_limit + keys = [] + for key, hist in self.db.iterator(prefix=prefix): + height, = unpack('>I', key[-4:]) + if height > cutoff: + break + keys.append(key) + return keys + + def undo_key(self, height): + '''DB key for undo information at the given height.''' + return b'U' + pack('>I', height) + + def write_undo_info(self, height, undo_info): + '''Write out undo information for the current height.''' + self.db.put(self.undo_key(height), undo_info) + + def read_undo_info(self, height): + '''Read undo information from a file for the current height.''' + return self.db.get(self.undo_key(height)) + def open_file(self, filename, create=False): '''Open the file name. Return its handle.''' try: @@ -142,7 +209,51 @@ class DB(LoggedClass): return open(filename, 'wb+') raise - def fs_read_headers(self, start, count): + def fs_update(self, fs_height, headers, block_tx_hashes): + '''Write headers, the tx_count array and block tx hashes to disk. + + Their first height is fs_height. No recorded DB state is + updated. These arrays are all append only, so in a crash we + just pick up again from the DB height. + ''' + blocks_done = len(self.headers) + new_height = fs_height + blocks_done + prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == new_height + 1 + + # First the headers + self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN) + self.headers_file.write(b''.join(headers)) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[fs_height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes))) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == txs_done + cursor = 0 + file_pos = prior_tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + def read_headers(self, start, count): '''Requires count >= 0.''' # Read some from disk disk_count = min(count, self.db_height + 1 - start) @@ -172,7 +283,7 @@ class DB(LoggedClass): return f.read(32), tx_height def fs_block_hashes(self, height, count): - headers = self.fs_read_headers(height, count) + headers = self.read_headers(height, count) # FIXME: move to coins.py hlen = self.coin.HEADER_LEN return [self.coin.header_hash(header) diff --git a/server/env.py b/server/env.py index 4c6716e..6160cde 100644 --- a/server/env.py +++ b/server/env.py @@ -45,7 +45,7 @@ class Env(LoggedClass): self.donation_address = self.default('DONATION_ADDRESS', '') self.db_engine = self.default('DB_ENGINE', 'leveldb') # Server limits to help prevent DoS - self.max_send = self.integer('MAX_SEND', 250000) + self.max_send = self.integer('MAX_SEND', 1000000) self.max_subs = self.integer('MAX_SUBS', 250000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) # IRC diff --git a/server/irc.py b/server/irc.py index 6fed7e2..7930d07 100644 --- a/server/irc.py +++ b/server/irc.py @@ -80,9 +80,9 @@ class IRC(LoggedClass): 'namreply', 'disconnect']: reactor.add_global_handler(event, getattr(self, 'on_' + event)) + connection = reactor.server() while True: try: - connection = reactor.server() connection.connect(self.irc_server, self.irc_port, self.nick, ircname=self.real_name) connection.set_keepalive(60) diff --git a/server/protocol.py b/server/protocol.py index b9f1884..7e95a19 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -311,7 +311,8 @@ class ServerManager(util.LoggedClass): for session in self.sessions: if isinstance(session, ElectrumX): # Use a tuple to distinguish from JSON - session.messages.put_nowait((self.bp.height, touched, cache)) + triple = (self.bp.db_height, touched, cache) + session.messages.put_nowait(triple) async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' @@ -377,7 +378,7 @@ class ServerManager(util.LoggedClass): async def rpc_getinfo(self, params): '''The RPC 'getinfo' call.''' return { - 'blocks': self.bp.height, + 'blocks': self.bp.db_height, 'peers': len(self.irc.peers), 'sessions': self.session_count(), 'watched': self.subscription_count, @@ -592,8 +593,8 @@ class ElectrumX(Session): .format(self.peername(), len(matches))) def height(self): - '''Return the block processor's current height.''' - return self.bp.height + '''Return the current flushed database height.''' + return self.bp.db_height def current_electrum_header(self): '''Used as response to a headers subscription request.''' @@ -836,6 +837,8 @@ class ElectrumX(Session): except Exception as e: self.logger.error('reading banner file {}: {}' .format(self.env.banner_file, e)) + else: + banner = banner.replace('$VERSION', VERSION) return banner async def donation_address(self, params): diff --git a/server/storage.py b/server/storage.py index 35e0a7e..b8a0785 100644 --- a/server/storage.py +++ b/server/storage.py @@ -77,7 +77,7 @@ class LevelDB(Storage): def open(self, name, create): self.db = self.module.DB(name, create_if_missing=create, - compression=None) + max_open_files=256, compression=None) self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator diff --git a/server/version.py b/server/version.py index 8b4837f..1028f97 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.7.10" +VERSION = "ElectrumX 0.7.11"