Browse Source

Merge branch 'release-0.2.2'

master 0.2.2
Neil Booth 8 years ago
parent
commit
6de049d14d
  1. 19
      docs/ARCHITECTURE.rst
  2. 11
      docs/RELEASE-NOTES
  3. 32
      electrumx_server.py
  4. 76
      lib/script.py
  5. 13
      lib/util.py
  6. 6
      query.py
  7. 343
      server/block_processor.py
  8. 168
      server/cache.py
  9. 107
      server/controller.py
  10. 4
      server/daemon.py
  11. 220
      server/db.py
  12. 59
      server/protocol.py
  13. 2
      server/version.py

19
docs/ARCHITECTURE.rst

@ -15,13 +15,13 @@ The components of the server are roughly like this::
- ElectrumX -<<<<<- LocalRPC -
------------- ------------
< >
---------- ------------------- ----------
- Daemon -<<<<<<<<- Block processor ->>>>- Caches -
---------- ------------------- ----------
---------- ------------------- --------------
- Daemon -<<<<<<<<- Block processor ->>>>- UTXO Cache -
---------- ------------------- --------------
< < > <
-------------- -----------
- Prefetcher - - Storage -
-------------- -----------
-------------- ----------------
- Prefetcher - - FS + Storage -
-------------- ----------------
Env
@ -90,10 +90,3 @@ IRC
Not currently imlpemented; will handle IRC communication for the
ElectrumX servers.
Controller
----------
A historical artefact that currently coordinates some of the above
components. Not pictured as it is doesn't seem to have a logical
place and so is probably going away.

11
docs/RELEASE-NOTES

@ -1,3 +1,14 @@
Version 0.2.2
-------------
- mostly refactoring: controller.py is gone; cache.py is half-gone.
Split BlockProcessor into 3: DB, BlockProcessor and BlockServer. DB
handles stored DB and FS state; BlockProcessor handles pushing the
chain forward and caching of updates, and BlockServer will
additionally serve clients on catchup. More to come.
- mempool: better logging; also yields during initial seeding
- issues fixed: #10
Version 0.2.1
-------------

32
electrumx_server.py

@ -9,39 +9,45 @@
'''Script to kick off the server.'''
import asyncio
import logging
import os
import signal
import traceback
from functools import partial
from server.env import Env
from server.controller import Controller
from server.protocol import BlockServer
def main_loop():
'''Get tasks; loop until complete.'''
'''Start the server.'''
if os.geteuid() == 0:
raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user '
'account and use that')
env = Env()
logging.info('switching current directory to {}'.format(env.db_dir))
os.chdir(env.db_dir)
loop = asyncio.get_event_loop()
#loop.set_debug(True)
controller = Controller(loop, env)
controller.start()
def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, shutting down'.format(signame))
for task in asyncio.Task.all_tasks():
task.cancel()
# Install signal handlers
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(on_signal, signame))
tasks = asyncio.Task.all_tasks(loop)
server = BlockServer(Env())
future = server.start()
try:
loop.run_until_complete(asyncio.gather(*tasks))
loop.run_until_complete(future)
except asyncio.CancelledError:
logging.warning('task cancelled; asyncio event loop closing')
pass
finally:
controller.stop()
server.stop()
loop.close()

76
lib/script.py

@ -12,8 +12,6 @@ import struct
from collections import namedtuple
from lib.enum import Enumeration
from lib.hash import hash160
from lib.util import cachedproperty
class ScriptError(Exception):
@ -58,80 +56,6 @@ assert OpCodes.OP_CHECKSIG == 0xac
assert OpCodes.OP_CHECKMULTISIG == 0xae
class ScriptSig(object):
'''A script from a tx input.
Typically provides one or more signatures.'''
SIG_ADDRESS, SIG_MULTI, SIG_PUBKEY, SIG_UNKNOWN = range(4)
def __init__(self, script, coin, kind, sigs, pubkeys):
self.script = script
self.coin = coin
self.kind = kind
self.sigs = sigs
self.pubkeys = pubkeys
@cachedproperty
def address(self):
if self.kind == SIG_ADDRESS:
return self.coin.address_from_pubkey(self.pubkeys[0])
if self.kind == SIG_MULTI:
return self.coin.multsig_address(self.pubkeys)
return 'Unknown'
@classmethod
def from_script(cls, script, coin):
'''Return an instance of this class.
Return an object with kind SIG_UNKNOWN for unrecognised scripts.'''
try:
return cls.parse_script(script, coin)
except ScriptError:
return cls(script, coin, SIG_UNKNOWN, [], [])
@classmethod
def parse_script(cls, script, coin):
'''Return an instance of this class.
Raises on unrecognised scripts.'''
ops, datas = Script.get_ops(script)
# Address, PubKey and P2SH redeems only push data
if not ops or not Script.match_ops(ops, [-1] * len(ops)):
raise ScriptError('unknown scriptsig pattern')
# Assume double data pushes are address redeems, single data
# pushes are pubkey redeems
if len(ops) == 2: # Signature, pubkey
return cls(script, coin, SIG_ADDRESS, [datas[0]], [datas[1]])
if len(ops) == 1: # Pubkey
return cls(script, coin, SIG_PUBKEY, [datas[0]], [])
# Presumably it is P2SH (though conceivably the above could be
# too; cannot be sure without the send-to script). We only
# handle CHECKMULTISIG P2SH, which because of a bitcoin core
# bug always start with an unused OP_0.
if ops[0] != OpCodes.OP_0:
raise ScriptError('unknown scriptsig pattern; expected OP_0')
# OP_0, Sig1, ..., SigM, pk_script
m = len(ops) - 2
pk_script = datas[-1]
pk_ops, pk_datas = Script.get_ops(script)
# OP_2 pubkey1 pubkey2 pubkey3 OP_3 OP_CHECKMULTISIG
n = len(pk_ops) - 3
pattern = ([OpCodes.OP_1 + m - 1] + [-1] * n
+ [OpCodes.OP_1 + n - 1, OpCodes.OP_CHECKMULTISIG])
if m <= n and Script.match_ops(pk_ops, pattern):
return cls(script, coin, SIG_MULTI, datas[1:-1], pk_datas[1:-2])
raise ScriptError('unknown multisig P2SH pattern')
class ScriptPubKey(object):
'''A class for handling a tx output script that gives conditions
necessary for spending.

13
lib/util.py

@ -82,6 +82,7 @@ def subclasses(base_class, strict=True):
pairs = inspect.getmembers(sys.modules[base_class.__module__], select)
return [pair[1] for pair in pairs]
def chunks(items, size):
'''Break up items, an iterable, into chunks of length size.'''
for i in range(0, len(items), size):
@ -90,20 +91,12 @@ def chunks(items, size):
def bytes_to_int(be_bytes):
'''Interprets a big-endian sequence of bytes as an integer'''
assert isinstance(be_bytes, (bytes, bytearray))
value = 0
for byte in be_bytes:
value = value * 256 + byte
return value
return int.from_bytes(be_bytes, 'big')
def int_to_bytes(value):
'''Converts an integer to a big-endian sequence of bytes'''
mods = []
while value:
value, mod = divmod(value, 256)
mods.append(mod)
return bytes(reversed(mods))
return value.to_bytes((value.bit_length() + 7) // 8, 'big')
def increment_byte_string(bs):

6
query.py

@ -13,11 +13,10 @@ Not currently documented; might become easier to use in future.
'''
import os
import sys
from server.env import Env
from server.block_processor import BlockProcessor
from server.DB import DB
from lib.hash import hash_to_str
@ -40,9 +39,8 @@ def count_entries(db):
def main():
env = Env()
bp = DB(env)
coin = env.coin
os.chdir(env.db_dir)
bp = BlockProcessor(env, None)
if len(sys.argv) == 1:
count_entries(bp.db)
return

343
server/block_processor.py

@ -9,19 +9,21 @@
import array
import ast
import asyncio
import itertools
import os
import struct
import time
from bisect import bisect_left
from collections import defaultdict, namedtuple
from collections import defaultdict
from functools import partial
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError
from server.cache import UTXOCache, NO_CACHE_ENTRY
from server.daemon import Daemon, DaemonError
from lib.hash import hash_to_str
from lib.tx import Deserializer
from lib.util import chunks, LoggedClass
import server.db
from server.storage import open_db
@ -33,9 +35,6 @@ def formatted_time(t):
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class ChainError(Exception):
pass
@ -78,7 +77,7 @@ class Prefetcher(LoggedClass):
else:
return blocks, None
async def start(self):
async def main_loop(self):
'''Loop forever polling for more blocks.'''
self.logger.info('starting daemon poll loop...')
while True:
@ -176,9 +175,11 @@ class MemPool(LoggedClass):
'''
hex_hashes = set(hex_hashes)
touched = set()
missing_utxos = 0
if self.count < 0:
self.logger.info('initial fetch of {:,d} daemon mempool txs'
initial = self.count < 0
if initial:
self.logger.info('beginning import of {:,d} mempool txs'
.format(len(hex_hashes)))
# Remove gone items
@ -198,6 +199,8 @@ class MemPool(LoggedClass):
# them into a dictionary of hex hash to deserialized tx.
hex_hashes.difference_update(self.txs)
raw_txs = await self.bp.daemon.getrawtransactions(hex_hashes)
if initial:
self.logger.info('all fetched, now analysing...')
new_txs = {hex_hash: Deserializer(raw_tx).read_tx()
for hex_hash, raw_tx in zip(hex_hashes, raw_txs) if raw_tx}
del raw_txs, hex_hashes
@ -210,7 +213,10 @@ class MemPool(LoggedClass):
def txout_pair(txout):
return (script_hash168(txout.pk_script), txout.value)
for hex_hash, tx in new_txs.items():
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 500 == 0:
await asyncio.sleep(0)
txout_pairs = [txout_pair(txout) for txout in tx.outputs]
self.txs[hex_hash] = (None, txout_pairs, None)
@ -221,22 +227,38 @@ class MemPool(LoggedClass):
return mempool_entry[1][txin.prev_idx], True
entry = utxo_lookup(txin.prev_hash, txin.prev_idx)
if entry == NO_CACHE_ENTRY:
# Not possible unless daemon is lying or we're corrupted?
self.logger.warning('no UTXO found for {} / {}'
.format(hash_to_str(txin.prev_hash),
txin.prev_idx))
# This happens when the daemon is a block ahead of us
# and has mempool txs spending new txs in that block
raise MissingUTXOError
value, = struct.unpack('<Q', entry[-8:])
return (entry[:21], value), False
if initial:
next_log = time.time()
self.logger.info('processed outputs, now examining inputs. '
'This can take some time...')
# Now add the inputs
for hex_hash, tx in new_txs.items():
for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals
if n % 50 == 0:
await asyncio.sleep(0)
if initial and time.time() > next_log:
next_log = time.time() + 10
self.logger.info('{:,d} done ({:d}%)'
.format(n, int(n / len(new_txs) * 100)))
txout_pairs = self.txs[hex_hash][1]
try:
infos = (txin_info(txin) for txin in tx.inputs)
txin_pairs, unconfs = zip(*infos)
except MissingUTXOError:
# If we were missing a UTXO for some reason drop this tx
# Drop this TX. If other mempool txs depend on it
# it's harmless - next time the mempool is refreshed
# they'll either be cleaned up or the UTXOs will no
# longer be missing.
missing_utxos += 1
del self.txs[hex_hash]
continue
self.txs[hex_hash] = (txin_pairs, txout_pairs, any(unconfs))
@ -249,6 +271,11 @@ class MemPool(LoggedClass):
self.hash168s[hash168].add(hex_hash)
touched.add(hash168)
if missing_utxos:
self.logger.info('{:,d} txs had missing UTXOs; probably the '
'daemon is a block or two ahead of us'
.format(missing_utxos))
self.count += 1
if self.count % 25 == 0 or gone:
self.count = 0
@ -283,21 +310,25 @@ class MemPool(LoggedClass):
return value
class BlockProcessor(LoggedClass):
class BlockProcessor(server.db.DB):
'''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing.
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon, on_update=None):
def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.
'''
super().__init__()
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env)
self.daemon = daemon
self.on_update = on_update
# These are our state as we move ahead of DB state
self.height = self.db_height
self.tip = self.db_tip
self.tx_count = self.db_tx_count
self.daemon = Daemon(env.daemon_url, env.debug)
self.daemon.debug_set_height(self.height)
self.mempool = MemPool(self)
self.touched = set()
@ -305,38 +336,20 @@ class BlockProcessor(LoggedClass):
self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.coin = env.coin
self.reorg_limit = env.reorg_limit
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name))
self.init_state()
self.tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
# Caches to be flushed later. Headers and tx_hashes have one
# entry per block
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height)
self.prefetcher = Prefetcher(self.daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
# Redirected member funcs
self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# Caches of unflushed items
self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin)
self.headers = []
self.tx_hashes = []
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
@ -355,12 +368,13 @@ class BlockProcessor(LoggedClass):
self.clean_db()
def coros(self):
self.daemon.debug_set_height(self.height)
return [self.start(), self.prefetcher.start()]
def start(self):
'''Returns a future that starts the block processor when awaited.'''
return asyncio.gather(self.main_loop(),
self.prefetcher.main_loop())
async def start(self):
'''External entry point for block processing.
async def main_loop(self):
'''Main loop for block processing.
Safely flushes the DB on clean shutdown.
'''
@ -385,6 +399,7 @@ class BlockProcessor(LoggedClass):
await asyncio.sleep(0) # Yield
if caught_up:
await self.caught_up(mempool_hashes)
self.touched = set()
except ChainReorg:
await self.handle_chain_reorg()
@ -396,10 +411,7 @@ class BlockProcessor(LoggedClass):
if self.first_sync:
self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height))
if self.on_update:
self.touched.update(await self.mempool.update(mempool_hashes))
await self.on_update(self.height, self.touched)
self.touched = set()
self.touched.update(await self.mempool.update(mempool_hashes))
async def handle_chain_reorg(self):
# First get all state on disk
@ -432,7 +444,7 @@ class BlockProcessor(LoggedClass):
start = self.height - 1
count = 1
while start > 0:
hashes = self.fs_cache.block_hashes(start, count)
hashes = self.fs_block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes)
@ -449,31 +461,7 @@ class BlockProcessor(LoggedClass):
'height {:,d} to height {:,d}'
.format(count, start, start + count - 1))
return self.fs_cache.block_hashes(start, count)
def init_state(self):
if self.db.is_new:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
state = self.db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise ChainError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True)
return self.fs_block_hashes(start, count)
def clean_db(self):
'''Clean out stale DB items.
@ -547,9 +535,6 @@ class BlockProcessor(LoggedClass):
self.height - self.db_height))
self.utxo_cache.flush(batch)
self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
def assert_flushed(self):
'''Asserts state is fully flushed.'''
@ -567,37 +552,40 @@ class BlockProcessor(LoggedClass):
self.assert_flushed()
return
self.flush_count += 1
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count
show_stats = self.first_sync
# Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to
# have updated the DB.
if self.height > self.db_height:
assert flush_history is None
flush_history = self.flush_history
self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last
# as it reads the wall time.
flush_history(batch)
if flush_utxos:
self.fs_flush()
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('committing transaction...')
# Update our in-memory state after successful flush
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
self.tx_hashes = []
self.headers = []
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
flush_time = int(self.last_flush - flush_start)
self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s'
.format(self.flush_count, self.height, self.tx_count,
flush_time))
int(self.last_flush - flush_start)))
# Catch-up stats
if show_stats:
@ -623,21 +611,68 @@ class BlockProcessor(LoggedClass):
formatted_time(tx_est / this_tx_per_sec)))
def flush_history(self, batch):
self.logger.info('flushing history')
self.flush_count += 1
flush_start = time.time()
flush_id = struct.pack('>H', self.flush_count)
for hash168, hist in self.history.items():
key = b'H' + hash168 + flush_id
batch.put(key, hist.tobytes())
self.logger.info('{:,d} history entries in {:,d} addrs'
.format(self.history_size, len(self.history)))
self.logger.info('flushed {:,d} history entries for {:,d} addrs '
'in {:,d}s'
.format(self.history_size, len(self.history),
int(time.time() - flush_start)))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def fs_flush(self):
'''Flush the things stored on the filesystem.'''
flush_start = time.time()
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.db_height]
if self.db_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.db_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.db_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.db_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.logger.info('FS flush took {:.1f} seconds'
.format(time.time() - flush_start))
def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
@ -707,9 +742,18 @@ class BlockProcessor(LoggedClass):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def fs_advance_block(self, header, tx_hashes, txs):
'''Update unflushed FS state for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def advance_block(self, block, update_touched):
# We must update the fs_cache before calling advance_txs() as
# the UTXO cache uses the fs_cache via get_tx_hash() to
# We must update the FS cache before calling advance_txs() as
# the UTXO cache uses the FS cache via get_tx_hash() to
# resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
@ -717,7 +761,7 @@ class BlockProcessor(LoggedClass):
raise ChainReorg
touched = set()
self.fs_cache.advance_block(header, tx_hashes, txs)
self.fs_advance_block(header, tx_hashes, txs)
self.tip = header_hash
self.height += 1
undo_info = self.advance_txs(tx_hashes, txs, touched)
@ -797,10 +841,13 @@ class BlockProcessor(LoggedClass):
hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs, touched)
self.fs_cache.backup_block()
self.tip = prev_hash
assert self.height >= 0
self.height -= 1
assert not self.headers
assert not self.tx_hashes
self.logger.info('backed up to height {:,d}'.format(self.height))
self.touched.update(touched)
@ -839,12 +886,33 @@ class BlockProcessor(LoggedClass):
assert n == 0
self.tx_count -= len(txs)
@staticmethod
def resolve_limit(limit):
if limit is None:
return -1
assert isinstance(limit, int) and limit >= 0
return limit
def read_headers(self, start, count):
# Read some from disk
disk_count = min(count, self.db_height + 1 - start)
result = self.fs_read_headers(start, disk_count)
count -= disk_count
start += disk_count
# The rest from memory
if count:
start -= self.db_height + 1
if not (count >= 0 and start + count <= len(self.headers)):
raise ChainError('{:,d} headers starting at {:,d} not on disk'
.format(count, start))
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
tx_hash, tx_height = self.fs_tx_hash(tx_num)
# Is this unflushed?
if tx_hash is None:
tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]]
return tx_hash, tx_height
def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -860,60 +928,3 @@ class BlockProcessor(LoggedClass):
Can be positive or negative.
'''
return self.mempool.value(hash168)
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
prefix = b'H' + hash168
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.get_tx_hash(tx_num)
limit -= 1
def get_balance(self, hash168):
'''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
def get_utxos(self, hash168, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
unpack = struct.unpack
prefix = b'u' + hash168
for k, v in self.db.iterator(prefix=prefix):
(tx_pos,) = unpack('<H', k[-2:])
for n in range(0, len(v), 12):
if limit == 0:
return
(tx_num,) = unpack('<I', v[n:n + 4])
(value,) = unpack('<Q', v[n + 4:n + 12])
tx_hash, height = self.get_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
limit -= 1
def get_utxos_sorted(self, hash168):
'''Returns all the UTXOs for an address sorted by height and
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def get_utxo_hash168(self, tx_hash, index):
'''Returns the hash168 for a UTXO.'''
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False)
if hash168 == NO_CACHE_ENTRY:
hash168 = None
return hash168

168
server/cache.py

@ -12,14 +12,10 @@ Once synced flushes are performed after processing each block.
'''
import array
import itertools
import os
import struct
from bisect import bisect_right
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256, hash_to_str
from lib.util import LoggedClass
from lib.hash import hash_to_str
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
@ -83,9 +79,9 @@ class UTXOCache(LoggedClass):
'''
def __init__(self, parent, db, coin):
def __init__(self, get_tx_hash, db, coin):
super().__init__()
self.parent = parent
self.get_tx_hash = get_tx_hash
self.coin = coin
self.cache = {}
self.put = self.cache.__setitem__
@ -137,7 +133,7 @@ class UTXOCache(LoggedClass):
assert len(data) % 12 == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
this_tx_hash, height = self.parent.get_tx_hash(tx_num)
this_tx_hash, height = self.get_tx_hash(tx_num)
if tx_hash == this_tx_hash:
result = hash168 + data[n:n+12]
if delete:
@ -185,7 +181,7 @@ class UTXOCache(LoggedClass):
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num)
my_hash, height = self.get_tx_hash(tx_num)
if my_hash == tx_hash:
if delete:
self.cache_write(key, data[:n] + data[n+25:])
@ -259,155 +255,3 @@ class UTXOCache(LoggedClass):
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.cache_spends = self.db_deletes = 0
class FSCache(LoggedClass):
def __init__(self, coin, height, tx_count):
super().__init__()
self.coin = coin
self.tx_hash_file_size = 16 * 1024 * 1024
assert self.tx_hash_file_size % 32 == 0
# On-disk values, updated by a flush
self.height = height
# Unflushed items
self.headers = []
self.tx_hashes = []
is_new = height == -1
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
if self.tx_counts:
assert tx_count == self.tx_counts[-1]
else:
assert tx_count == 0
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def advance_block(self, header, tx_hashes, txs):
'''Update the FS cache for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def backup_block(self):
'''Revert a block.'''
assert not self.headers
assert not self.tx_hashes
assert self.height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.height -= 1
self.tx_counts.pop()
def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
self.logger.info('flushing to file system')
blocks_done = len(self.headers)
prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.height + blocks_done == new_height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
assert cur_tx_count == new_tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.tx_hashes = []
self.headers = []
self.height += blocks_done
def read_headers(self, start, count):
result = b''
# Read some from disk
disk_count = min(count, self.height + 1 - start)
if disk_count > 0:
header_len = self.coin.HEADER_LEN
assert start >= 0
self.headers_file.seek(start * header_len)
result = self.headers_file.read(disk_count * header_len)
count -= disk_count
start += disk_count
# The rest from memory
start -= self.height + 1
assert count >= 0 and start + count <= len(self.headers)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.height:
tx_hashes = self.tx_hashes[height - (self.height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]

107
server/controller.py

@ -1,107 +0,0 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Server controller.
Coordinates the parts of the server. Serves as a cache for
client-serving data such as histories.
'''
import asyncio
import signal
import ssl
from functools import partial
from server.daemon import Daemon
from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.util import LoggedClass
class Controller(LoggedClass):
def __init__(self, loop, env):
'''Create up the controller.
Creates DB, Daemon and BlockProcessor instances.
'''
super().__init__()
self.loop = loop
self.env = env
self.coin = env.coin
self.daemon = Daemon(env.daemon_url, env.debug)
self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update)
JSONRPC.init(self.block_processor, self.daemon, self.coin)
self.servers = []
def start(self):
'''Prime the event loop with asynchronous jobs.'''
coros = self.block_processor.coros()
for coro in coros:
asyncio.ensure_future(coro)
# Signal handlers
for signame in ('SIGINT', 'SIGTERM'):
self.loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
async def on_update(self, height, touched):
if not self.servers:
self.servers = await self.start_servers()
ElectrumX.notify(height, touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified. Does
nothing if servers are already running.
'''
servers = []
env = self.env
loop = self.loop
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
return servers
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
def on_signal(self, signame):
'''Call on receipt of a signal to cleanly shutdown.'''
self.logger.warning('received {} signal, preparing to shut down'
.format(signame))
for task in asyncio.Task.all_tasks(self.loop):
task.cancel()

4
server/daemon.py

@ -86,11 +86,11 @@ class Daemon(util.LoggedClass):
except aiohttp.ClientHttpProcessingError:
msg = 'HTTP error'
except aiohttp.ServerDisconnectedError:
msg = 'daemon disconnected'
msg = 'disconnected'
except aiohttp.ClientConnectionError:
msg = 'connection problem - is your daemon running?'
except DaemonWarmingUpError:
msg = 'daemon is still warming up'
msg = 'still starting up checking blocks...'
if msg != prior_msg or count == 10:
self.logger.error('{}. Retrying between sleeps...'

220
server/db.py

@ -0,0 +1,220 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Interface to the blockchain database.'''
import array
import ast
import os
import struct
from bisect import bisect_right
from collections import namedtuple
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256
from server.storage import open_db
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class DB(LoggedClass):
'''Simple wrapper of the backend database for querying.
Performs no DB update, though the DB will be cleaned on opening if
it was shutdown uncleanly.
'''
class DBError(Exception):
pass
def __init__(self, env):
super().__init__()
self.env = env
self.coin = env.coin
self.logger.info('switching current directory to {}'
.format(env.db_dir))
os.chdir(env.db_dir)
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name))
self.init_state_from_db()
create = self.db_height == -1
self.headers_file = self.open_file('headers', create)
self.txcount_file = self.open_file('txcount', create)
self.tx_hash_file_size = 16 * 1024 * 1024
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.db_height + 1)
if self.tx_counts:
assert self.db_tx_count == self.tx_counts[-1]
else:
assert self.db_tx_count == 0
def init_state_from_db(self):
if self.db.is_new:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
state = self.db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise self.DBError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True)
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def fs_read_headers(self, start, count):
# Read some from disk
disk_count = min(count, self.db_height + 1 - start)
if start < 0 or count < 0 or disk_count != count:
raise self.DBError('{:,d} headers starting at {:,d} not on disk'
.format(count, start))
if disk_count:
header_len = self.coin.HEADER_LEN
self.headers_file.seek(start * header_len)
return self.headers_file.read(disk_count * header_len)
return b''
def fs_tx_hash(self, tx_num):
'''Return a par (tx_hash, tx_height) for the given tx number.
If the tx_height is not on disk, returns (None, tx_height).'''
tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > self.db_height:
return None, tx_height
raise self.DBError('tx_num {:,d} is not on disk')
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
return f.read(32), tx_height
def fs_block_hashes(self, height, count):
headers = self.fs_read_headers(height, count)
# FIXME: move to coins.py
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]
@staticmethod
def _resolve_limit(limit):
if limit is None:
return -1
assert isinstance(limit, int) and limit >= 0
return limit
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self._resolve_limit(limit)
prefix = b'H' + hash168
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.fs_tx_hash(tx_num)
limit -= 1
def get_balance(self, hash168):
'''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
def get_utxos(self, hash168, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self._resolve_limit(limit)
unpack = struct.unpack
prefix = b'u' + hash168
for k, v in self.db.iterator(prefix=prefix):
(tx_pos,) = unpack('<H', k[-2:])
for n in range(0, len(v), 12):
if limit == 0:
return
(tx_num,) = unpack('<I', v[n:n + 4])
(value,) = unpack('<Q', v[n + 4:n + 12])
tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
limit -= 1
def get_utxos_sorted(self, hash168):
'''Returns all the UTXOs for an address sorted by height and
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def get_utxo_hash168(self, tx_hash, index):
'''Returns the hash168 for a UTXO.'''
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.hash168(tx_hash, idx_packed)
return hash168
def hash168(self, tx_hash, idx_packed):
'''Return the hash168 paid to by the given TXO.
Return None if not found.'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idx_packed
data = self.db.get(key)
if data is None:
return None
if len(data) == 25:
return data[:21]
assert len(data) % 25 == 0
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
tx_num, = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.fs_tx_hash(tx_num)
if my_hash == tx_hash:
return data[n:n+21]
raise self.DBError('could not resolve hash168 collision')

59
server/protocol.py

@ -11,10 +11,12 @@
import asyncio
import codecs
import json
import ssl
import traceback
from collections import namedtuple
from functools import partial
from server.block_processor import BlockProcessor
from server.daemon import DaemonError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass
@ -30,6 +32,63 @@ def json_notification(method, params):
return {'id': None, 'method': method, 'params': params}
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also starts servers when caught up.'''
def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env)
self.servers = []
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
ElectrumX.notify(self.height, self.touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified.
'''
env = self.env
loop = asyncio.get_event_loop()
JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
self.servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
AsyncTask = namedtuple('AsyncTask', 'session job')
class SessionManager(LoggedClass):

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.2.1"
VERSION = "ElectrumX 0.2.2"

Loading…
Cancel
Save