Browse Source

Merge branch 'separate_db' into develop

master
Neil Booth 8 years ago
parent
commit
604698ad0d
  1. 7
      docs/ARCHITECTURE.rst
  2. 27
      electrumx_server.py
  3. 6
      query.py
  4. 206
      server/block_processor.py
  5. 8
      server/cache.py
  6. 94
      server/controller.py
  7. 148
      server/db.py

7
docs/ARCHITECTURE.rst

@ -90,10 +90,3 @@ IRC
Not currently imlpemented; will handle IRC communication for the Not currently imlpemented; will handle IRC communication for the
ElectrumX servers. ElectrumX servers.
Controller
----------
A historical artefact that currently coordinates some of the above
components. Not pictured as it is doesn't seem to have a logical
place and so is probably going away.

27
electrumx_server.py

@ -17,11 +17,11 @@ import traceback
from functools import partial from functools import partial
from server.env import Env from server.env import Env
from server.controller import Controller from server.block_processor import BlockServer
def cancel_tasks(loop): def close_loop(loop):
# Cancel and collect the remaining tasks '''Close the loop down cleanly. Cancel and collect remaining tasks.'''
tasks = asyncio.Task.all_tasks() tasks = asyncio.Task.all_tasks()
for task in tasks: for task in tasks:
task.cancel() task.cancel()
@ -31,41 +31,36 @@ def cancel_tasks(loop):
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
loop.close()
def main_loop(): def main_loop():
'''Get tasks; loop until complete.''' '''Start the server.'''
if os.geteuid() == 0: if os.geteuid() == 0:
raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user ' raise Exception('DO NOT RUN AS ROOT! Create an unpriveleged user '
'account and use that') 'account and use that')
env = Env()
logging.info('switching current directory to {}'.format(env.db_dir))
os.chdir(env.db_dir)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
#loop.set_debug(True) #loop.set_debug(True)
controller = Controller(loop, env)
# Signal handlers
def on_signal(signame): def on_signal(signame):
'''Call on receipt of a signal to cleanly shutdown.''' '''Call on receipt of a signal to cleanly shutdown.'''
logging.warning('received {} signal, preparing to shut down' logging.warning('received {} signal, preparing to shut down'
.format(signame)) .format(signame))
loop.stop() loop.stop()
# Install signal handlers
for signame in ('SIGINT', 'SIGTERM'): for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame), loop.add_signal_handler(getattr(signal, signame),
partial(on_signal, signame)) partial(on_signal, signame))
controller.start() server = BlockServer(Env())
server.start()
try: try:
loop.run_forever() loop.run_forever()
finally: finally:
controller.stop() server.stop()
cancel_tasks(loop) close_loop(loop)
loop.close()
def main(): def main():

6
query.py

@ -13,11 +13,10 @@ Not currently documented; might become easier to use in future.
''' '''
import os
import sys import sys
from server.env import Env from server.env import Env
from server.block_processor import BlockProcessor from server.DB import DB
from lib.hash import hash_to_str from lib.hash import hash_to_str
@ -40,9 +39,8 @@ def count_entries(db):
def main(): def main():
env = Env() env = Env()
bp = DB(env)
coin = env.coin coin = env.coin
os.chdir(env.db_dir)
bp = BlockProcessor(env, None)
if len(sys.argv) == 1: if len(sys.argv) == 1:
count_entries(bp.db) count_entries(bp.db)
return return

206
server/block_processor.py

@ -11,6 +11,7 @@
import array import array
import ast import ast
import asyncio import asyncio
import ssl
import struct import struct
import time import time
from bisect import bisect_left from bisect import bisect_left
@ -18,10 +19,12 @@ from collections import defaultdict, namedtuple
from functools import partial from functools import partial
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError from server.daemon import Daemon, DaemonError
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.tx import Deserializer from lib.tx import Deserializer
from lib.util import chunks, LoggedClass from lib.util import chunks, LoggedClass
import server.db
from server.storage import open_db from server.storage import open_db
@ -33,9 +36,6 @@ def formatted_time(t):
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class ChainError(Exception): class ChainError(Exception):
pass pass
@ -78,7 +78,11 @@ class Prefetcher(LoggedClass):
else: else:
return blocks, None return blocks, None
async def start(self): def start(self):
'''Start the prefetcher.'''
asyncio.ensure_future(self.main_loop())
async def main_loop(self):
'''Loop forever polling for more blocks.''' '''Loop forever polling for more blocks.'''
self.logger.info('starting daemon poll loop...') self.logger.info('starting daemon poll loop...')
while True: while True:
@ -283,21 +287,20 @@ class MemPool(LoggedClass):
return value return value
class BlockProcessor(LoggedClass): class BlockProcessor(server.db.DB):
'''Process blocks and update the DB state to match. '''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing. Employ a prefetcher to prefetch blocks in batches for processing.
Coordinate backing up in case of chain reorganisations. Coordinate backing up in case of chain reorganisations.
''' '''
def __init__(self, env, daemon, on_update=None): def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the '''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated. daemon and a new block arrives or the mempool is updated.'''
''' super().__init__(env)
super().__init__()
self.daemon = daemon self.daemon = Daemon(env.daemon_url, env.debug)
self.on_update = on_update self.daemon.debug_set_height(self.height)
self.mempool = MemPool(self) self.mempool = MemPool(self)
self.touched = set() self.touched = set()
@ -305,39 +308,16 @@ class BlockProcessor(LoggedClass):
self.utxo_MB = env.utxo_MB self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB self.hist_MB = env.hist_MB
self.next_cache_check = 0 self.next_cache_check = 0
self.coin = env.coin
self.reorg_limit = env.reorg_limit self.reorg_limit = env.reorg_limit
# Open DB and metadata files. Record some of its state. # Headers and tx_hashes have one entry per block
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name))
self.init_state()
self.tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
# Caches to be flushed later. Headers and tx_hashes have one
# entry per block
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0 self.history_size = 0
self.utxo_cache = UTXOCache(self, self.db, self.coin) self.prefetcher = Prefetcher(self.daemon, self.height)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height)
self.last_flush = time.time() self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count self.last_flush_tx_count = self.tx_count
# Redirected member funcs
self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# Log state # Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} ' self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} utxo flush count: {:,d} ' 'flush count: {:,d} utxo flush count: {:,d} '
@ -355,12 +335,13 @@ class BlockProcessor(LoggedClass):
self.clean_db() self.clean_db()
def coros(self): def start(self):
self.daemon.debug_set_height(self.height) '''Start the block processor.'''
return [self.start(), self.prefetcher.start()] asyncio.ensure_future(self.main_loop())
self.prefetcher.start()
async def start(self): async def main_loop(self):
'''External entry point for block processing. '''Main loop for block processing.
Safely flushes the DB on clean shutdown. Safely flushes the DB on clean shutdown.
''' '''
@ -385,6 +366,7 @@ class BlockProcessor(LoggedClass):
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
if caught_up: if caught_up:
await self.caught_up(mempool_hashes) await self.caught_up(mempool_hashes)
self.touched = set()
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg() await self.handle_chain_reorg()
@ -396,10 +378,7 @@ class BlockProcessor(LoggedClass):
if self.first_sync: if self.first_sync:
self.first_sync = False self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height)) self.logger.info('synced to height {:,d}'.format(self.height))
if self.on_update:
self.touched.update(await self.mempool.update(mempool_hashes)) self.touched.update(await self.mempool.update(mempool_hashes))
await self.on_update(self.height, self.touched)
self.touched = set()
async def handle_chain_reorg(self): async def handle_chain_reorg(self):
# First get all state on disk # First get all state on disk
@ -451,30 +430,6 @@ class BlockProcessor(LoggedClass):
return self.fs_cache.block_hashes(start, count) return self.fs_cache.block_hashes(start, count)
def init_state(self):
if self.db.is_new:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
state = self.db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise ChainError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True)
def clean_db(self): def clean_db(self):
'''Clean out stale DB items. '''Clean out stale DB items.
@ -839,13 +794,6 @@ class BlockProcessor(LoggedClass):
assert n == 0 assert n == 0
self.tx_count -= len(txs) self.tx_count -= len(txs)
@staticmethod
def resolve_limit(limit):
if limit is None:
return -1
assert isinstance(limit, int) and limit >= 0
return limit
def mempool_transactions(self, hash168): def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168. entries for the hash168.
@ -861,59 +809,59 @@ class BlockProcessor(LoggedClass):
''' '''
return self.mempool.value(hash168) return self.mempool.value(hash168)
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
prefix = b'H' + hash168
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.get_tx_hash(tx_num)
limit -= 1
def get_balance(self, hash168): class BlockServer(BlockProcessor):
'''Returns the confirmed balance of an address.''' '''Like BlockProcessor but also starts servers when caught up.'''
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
def get_utxos(self, hash168, limit=1000): def __init__(self, env):
'''Generator that yields all UTXOs for an address sorted in no '''on_update is awaitable, and called only when caught up with the
particular order. By default yields at most 1000 entries. daemon and a new block arrives or the mempool is updated.'''
Set limit to None to get them all. super().__init__(env)
''' self.servers = []
limit = self.resolve_limit(limit)
unpack = struct.unpack
prefix = b'u' + hash168
for k, v in self.db.iterator(prefix=prefix):
(tx_pos,) = unpack('<H', k[-2:])
for n in range(0, len(v), 12): async def caught_up(self, mempool_hashes):
if limit == 0: await super().caught_up(mempool_hashes)
return if not self.servers:
(tx_num,) = unpack('<I', v[n:n + 4]) await self.start_servers()
(value,) = unpack('<Q', v[n + 4:n + 12]) ElectrumX.notify(self.height, self.touched)
tx_hash, height = self.get_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value) async def start_servers(self):
limit -= 1 '''Start listening on RPC, TCP and SSL ports.
def get_utxos_sorted(self, hash168): Does not start a server if the port wasn't specified.
'''Returns all the UTXOs for an address sorted by height and '''
position in the block.''' env = self.env
return sorted(self.get_utxos(hash168, limit=None)) loop = asyncio.get_event_loop()
def get_utxo_hash168(self, tx_hash, index): JSONRPC.init(self, self.daemon, self.coin)
'''Returns the hash168 for a UTXO.'''
hash168 = None protocol = LocalRPC
if 0 <= index <= 65535: if env.rpc_port is not None:
idx_packed = struct.pack('<H', index) host = 'localhost'
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False) rpc_server = loop.create_server(protocol, host, env.rpc_port)
if hash168 == NO_CACHE_ENTRY: self.servers.append(await rpc_server)
hash168 = None self.logger.info('RPC server listening on {}:{:d}'
return hash168 .format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
self.servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()

8
server/cache.py

@ -83,9 +83,9 @@ class UTXOCache(LoggedClass):
''' '''
def __init__(self, parent, db, coin): def __init__(self, get_tx_hash, db, coin):
super().__init__() super().__init__()
self.parent = parent self.get_tx_hash = get_tx_hash
self.coin = coin self.coin = coin
self.cache = {} self.cache = {}
self.put = self.cache.__setitem__ self.put = self.cache.__setitem__
@ -137,7 +137,7 @@ class UTXOCache(LoggedClass):
assert len(data) % 12 == 0 assert len(data) % 12 == 0
for n in range(0, len(data), 12): for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4]) (tx_num, ) = struct.unpack('<I', data[n:n+4])
this_tx_hash, height = self.parent.get_tx_hash(tx_num) this_tx_hash, height = self.get_tx_hash(tx_num)
if tx_hash == this_tx_hash: if tx_hash == this_tx_hash:
result = hash168 + data[n:n+12] result = hash168 + data[n:n+12]
if delete: if delete:
@ -185,7 +185,7 @@ class UTXOCache(LoggedClass):
# Resolve the compressed key collision using the TX number # Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25): for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25]) (tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num) my_hash, height = self.get_tx_hash(tx_num)
if my_hash == tx_hash: if my_hash == tx_hash:
if delete: if delete:
self.cache_write(key, data[:n] + data[n+25:]) self.cache_write(key, data[:n] + data[n+25:])

94
server/controller.py

@ -1,94 +0,0 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Server controller.
Coordinates the parts of the server. Serves as a cache for
client-serving data such as histories.
'''
import asyncio
import ssl
from functools import partial
from server.daemon import Daemon
from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.util import LoggedClass
class Controller(LoggedClass):
def __init__(self, loop, env):
'''Create up the controller.
Creates DB, Daemon and BlockProcessor instances.
'''
super().__init__()
self.loop = loop
self.env = env
self.coin = env.coin
self.daemon = Daemon(env.daemon_url, env.debug)
self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update)
JSONRPC.init(self.block_processor, self.daemon, self.coin)
self.servers = []
def start(self):
'''Prime the event loop with asynchronous jobs.'''
coros = self.block_processor.coros()
for coro in coros:
asyncio.ensure_future(coro)
async def on_update(self, height, touched):
if not self.servers:
self.servers = await self.start_servers()
ElectrumX.notify(height, touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified. Does
nothing if servers are already running.
'''
servers = []
env = self.env
loop = self.loop
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
return servers
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()

148
server/db.py

@ -0,0 +1,148 @@
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Interface to the blockchain database.'''
import array
import ast
import os
import struct
from collections import namedtuple
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from lib.util import LoggedClass
from server.storage import open_db
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class DB(LoggedClass):
'''Simple wrapper of the backend database for querying.
Performs no DB update, though the DB will be cleaned on opening if
it was shutdown uncleanly.
'''
def __init__(self, env):
super().__init__()
self.env = env
self.coin = env.coin
self.logger.info('switching current directory to {}'
.format(env.db_dir))
os.chdir(env.db_dir)
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name))
self.init_state_from_db()
self.tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
# Cache wrapping the filesystem and redirected functions
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# UTXO cache
self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin)
def init_state_from_db(self):
if self.db.is_new:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
state = self.db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise ChainError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True)
@staticmethod
def _resolve_limit(limit):
if limit is None:
return -1
assert isinstance(limit, int) and limit >= 0
return limit
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of confirmed transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self._resolve_limit(limit)
prefix = b'H' + hash168
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.get_tx_hash(tx_num)
limit -= 1
def get_balance(self, hash168):
'''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
def get_utxos(self, hash168, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self._resolve_limit(limit)
unpack = struct.unpack
prefix = b'u' + hash168
for k, v in self.db.iterator(prefix=prefix):
(tx_pos,) = unpack('<H', k[-2:])
for n in range(0, len(v), 12):
if limit == 0:
return
(tx_num,) = unpack('<I', v[n:n + 4])
(value,) = unpack('<Q', v[n + 4:n + 12])
tx_hash, height = self.get_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
limit -= 1
def get_utxos_sorted(self, hash168):
'''Returns all the UTXOs for an address sorted by height and
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def get_utxo_hash168(self, tx_hash, index):
'''Returns the hash168 for a UTXO.'''
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False)
if hash168 == NO_CACHE_ENTRY:
hash168 = None
return hash168
Loading…
Cancel
Save