Browse Source

Merge branch 'release-0.02'

master
Neil Booth 8 years ago
parent
commit
43107a8140
  1. 63
      HOWTO.rst
  2. 24
      README.rst
  3. 11
      RELEASE-NOTES
  4. 62
      electrumx_rpc.py
  5. 13
      lib/coins.py
  6. 10
      lib/hash.py
  7. 47
      lib/tx.py
  8. 8
      lib/util.py
  9. 37
      query.py
  10. 22
      samples/scripts/NOTES
  11. 0
      samples/scripts/env/DAEMON_HOST
  12. 0
      samples/scripts/env/DAEMON_PASSWORD
  13. 0
      samples/scripts/env/DAEMON_PORT
  14. 0
      samples/scripts/env/DAEMON_USERNAME
  15. 0
      samples/scripts/env/UTXO_MB
  16. 11
      samples/systemd-unit
  17. 714
      server/block_processor.py
  18. 400
      server/cache.py
  19. 165
      server/controller.py
  20. 98
      server/daemon.py
  21. 679
      server/db.py
  22. 39
      server/env.py
  23. 216
      server/protocol.py
  24. 174
      server/server.py
  25. 1
      server/version.py
  26. 12
      server_main.py

63
HOWTO.rst

@ -6,14 +6,15 @@ successfully on MaxOSX and DragonFlyBSD. It won't run out-of-the-box
on Windows, but the changes required to make it do so should be
small - patches welcome.
+ Python3: ElectrumX makes heavy use of asyncio so version >=3.5 is required
+ Python3: ElectrumX uses asyncio. Python version >=3.5 is required.
+ plyvel: Python interface to LevelDB. I am using plyvel-0.9.
+ aiohttp: Python library for asynchronous HTTP. ElectrumX uses it for
communication with the daemon. I am using aiohttp-0.21.
communication with the daemon. Version >= 1.0 required; I am
using 1.0.5.
While not requirements for running ElectrumX, it is intended to be run
with supervisor software such as Daniel Bernstein's daemontools, or
Gerald Pape's runit package. These make administration of secure
with supervisor software such as Daniel Bernstein's daemontools,
Gerald Pape's runit package or systemd. These make administration of secure
unix servers very easy, and I strongly recommend you install one of these
and familiarise yourself with them. The instructions below and sample
run scripts assume daemontools; adapting to runit should be trivial
@ -55,6 +56,10 @@ on an SSD::
mkdir /path/to/db_directory
chown electrumx /path/to/db_directory
Using daemontools
-----------------
Next create a daemontools service directory; this only holds symlinks
(see daemontools documentation). The 'svscan' program will ensure the
servers in the directory are running by launching a 'supervise'
@ -107,6 +112,34 @@ You can see its logs with::
tail -F /path/to/log/dir/current | tai64nlocal
Using systemd
-------------
This repository contains a sample systemd unit file that you can use to
setup ElectrumX with systemd. Simply copy it to :code:`/etc/systemd/system`::
cp samples/systemd-unit /etc/systemd/system/electrumx.service
The sample unit file assumes that the repository is located at
:code:`/home/electrumx/electrumx`. If that differs on your system, you need to
change the unit file accordingly.
You need to set a few configuration variables in :code:`/etc/electrumx.conf`,
see `samples/NOTES` for the list of required variables.
Now you can start ElectrumX using :code:`systemctl`::
systemctl start electrumx
You can use :code:`journalctl` to check the log output::
journalctl -u electrumx -f
Once configured, you may want to start ElectrumX at boot::
systemctl enable electrumx
Sync Progress
=============
@ -127,13 +160,14 @@ Here is my experience with the current codebase, to given heights and
rough wall-time::
Machine A Machine B DB + Metadata
180,000 7m 10s 0.4 GiB
245,800 1h 00m 2.7 GiB
290,000 1h 56m 3.3 GiB
343,000 3h 56m 6.0 GiB
386,000 7h 28m 7.0 GiB
404,000 9h 41m
434,369 14h 38m 17.1 GiB
181,000 7m 09s 0.4 GiB
255,000 1h 02m 2.7 GiB
289,000 1h 46m 3.3 GiB
317,000 2h 33m
351,000 3h 58m
377,000 6h 06m 6.5 GiB
403,400 8h 51m
436,196 14h 03m 17.3 GiB
Machine A: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, 8GB
RAM and a DragonFlyBSD HAMMER fileystem on an SSD. It requests blocks
@ -141,7 +175,7 @@ over the LAN from a bitcoind on machine B.
Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz
quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on
the same machine. HIST_MB of 400, CACHE_MB of 2,000.
the same machine. HIST_MB of 350, UTXO_MB of 1,600.
For chains other than bitcoin-mainnet sychronization should be much
faster.
@ -158,7 +192,7 @@ by bringing it down like so::
If processing the blockchain the server will start the process of
flushing to disk. Once that is complete the server will exit. Be
patient as disk flushing can take a while.
patient as disk flushing can take many minutes.
ElectrumX flushes to leveldb using its transaction functionality. The
plyvel documentation claims this is atomic. I have written ElectrumX
@ -228,4 +262,5 @@ After flush-to-disk you may see an aiohttp error; this is the daemon
timing out the connection while the disk flush was in progress. This
is harmless.
The ETA is just a guide and can be quite volatile.
The ETA is just a guide and can be quite volatile. It is too optimistic
initially.

24
README.rst

@ -47,33 +47,28 @@ faster?
All of the following likely play a part:
- aggressive caching and batching of DB writes
- more compact representation of UTXOs, the mp address index, and
- more compact representation of UTXOs, the address index, and
history. Electrum server stores full transaction hash and height
for all UTXOs. In its pruned history it does the same. ElectrumX
just stores the transaction number in the linear history of
transactions, and it looks like that for at least 5 years that will
fit in a 4-byte integer. ElectrumX calculates the height from a
simple lookup in a linear array which is stored on disk. ElectrumX
also stores transaction hashes in a linear array on disk.
transactions. For at least another 5 years the transaction number
will fit in a 4-byte integer. ElectrumX calculates the height from
a simple lookup in a linear array which is stored on disk.
ElectrumX also stores transaction hashes in a linear array on disk.
- storing static append-only metadata which is indexed by position on
disk rather than in levelDB. It would be nice to do this for histories
but I cannot think how they could be easily indexable on a filesystem.
- avoiding unnecessary or redundant computations
- more efficient memory usage
- asyncio and asynchronous prefetch of blocks. With luck ElectrumX
will have no need of threads or locking primitives
- because it prunes electrum-server needs to store undo information,
ElectrumX should does not need to store undo information for
blockchain reorganisations (note blockchain reorgs are not yet
implemented in ElectrumX)
- asyncio and asynchronous prefetch of blocks. ElectrumX should not
have any need of threads.
Roadmap
=======
- test a few more performance improvement ideas
- handle blockchain reorgs
- handle client connections
- handle client connections (half-implemented but not functional)
- potentially move some functionality to C or C++
Once I get round to writing the server part, I will add DoS
@ -102,12 +97,13 @@ As I've been researching where the time is going during block chain
indexing and how various cache sizes and hardware choices affect it,
I'd appreciate it if anyone trying to synchronize could tell me::
- the version of ElectrumX
- their O/S and filesystem
- their hardware (CPU name and speed, RAM, and disk kind)
- whether their daemon was on the same host or not
- whatever stats about sync height vs time they can provide (the
logs give it all in wall time)
- the network they synced
- the network (e.g. bitcoin mainnet) they synced
Neil Booth

11
RELEASE-NOTES

@ -0,0 +1,11 @@
Version 0.02
------------
- fix bug where tx counts were incorrectly saved
- large clean-up and refactoring of code, breakout into new files
- several efficiency improvements
- initial implementation of chain reorg handling
- work on RPC and TCP server functionality. Code committed but not
functional, so currently disabled
- note that some of the enivronment variables have been renamed,
see samples/scripts/NOTES for the list

62
electrumx_rpc.py

@ -0,0 +1,62 @@
#!/usr/bin/env python3
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import argparse
import asyncio
import json
from functools import partial
from os import environ
class RPCClient(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exc):
self.loop.stop()
def send(self, payload):
data = json.dumps(payload) + '\n'
self.transport.write(data.encode())
def data_received(self, data):
payload = json.loads(data.decode())
self.transport.close()
print(json.dumps(payload, indent=4, sort_keys=True))
def main():
'''Send the RPC command to the server and print the result.'''
parser = argparse.ArgumentParser('Send electrumx an RPC command' )
parser.add_argument('-p', '--port', metavar='port_num', type=int,
help='RPC port number')
parser.add_argument('command', nargs='*', default=[],
help='command to send')
args = parser.parse_args()
if args.port is None:
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
payload = {'method': args.command[0], 'params': args.command[1:]}
loop = asyncio.get_event_loop()
proto_factory = partial(RPCClient, loop)
coro = loop.create_connection(proto_factory, 'localhost', args.port)
try:
transport, protocol = loop.run_until_complete(coro)
protocol.send(payload)
loop.run_forever()
except OSError:
print('error connecting - is ElectrumX running?')
finally:
loop.close()
if __name__ == '__main__':
main()

13
lib/coins.py

@ -6,7 +6,7 @@ from decimal import Decimal
import inspect
import sys
from lib.hash import Base58, hash160
from lib.hash import Base58, hash160, double_sha256
from lib.script import ScriptPubKey
from lib.tx import Deserializer
@ -135,10 +135,17 @@ class Coin(object):
payload.append(0x01)
return Base58.encode_check(payload)
@classmethod
def header_hashes(cls, header):
'''Given a header return the previous block hash and the current block
hash.'''
return header[4:36], double_sha256(header)
@classmethod
def read_block(cls, block):
d = Deserializer(block[cls.HEADER_LEN:])
return d.read_block()
'''Read a block and return (header, tx_hashes, txs)'''
header, rest = block[:cls.HEADER_LEN], block[cls.HEADER_LEN:]
return (header, ) + Deserializer(rest).read_block()
@classmethod
def decimal_value(cls, value):

10
lib/hash.py

@ -30,6 +30,16 @@ def hmac_sha512(key, msg):
def hash160(x):
return ripemd160(sha256(x))
def hash_to_str(x):
'''Converts a big-endian binary hash to a little-endian hex string, as
shown in block explorers, etc.
'''
return bytes(reversed(x)).hex()
def hex_str_to_hash(x):
'''Converts a little-endian hex string as shown to a big-endian binary
hash.'''
return bytes(reversed(bytes.fromhex(x)))
class InvalidBase58String(Exception):
pass

47
lib/tx.py

@ -2,11 +2,10 @@
# and warranty status of this software.
from collections import namedtuple
import binascii
import struct
from lib.util import cachedproperty
from lib.hash import double_sha256
from lib.hash import double_sha256, hash_to_str
class Tx(namedtuple("Tx", "version inputs outputs locktime")):
@ -15,17 +14,17 @@ class Tx(namedtuple("Tx", "version inputs outputs locktime")):
def is_coinbase(self):
return self.inputs[0].is_coinbase
OutPoint = namedtuple("OutPoint", "hash n")
# FIXME: add hash as a cached property?
# prevout is an OutPoint object
class TxInput(namedtuple("TxInput", "prevout script sequence")):
class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")):
ZERO = bytes(32)
MINUS_1 = 4294967295
@cachedproperty
def is_coinbase(self):
return self.prevout == (TxInput.ZERO, TxInput.MINUS_1)
return (self.prev_hash == TxInput.ZERO
and self.prev_idx == TxInput.MINUS_1)
@cachedproperty
def script_sig_info(self):
@ -34,11 +33,11 @@ class TxInput(namedtuple("TxInput", "prevout script sequence")):
return None
return Script.parse_script_sig(self.script)
def __repr__(self):
script = binascii.hexlify(self.script).decode("ascii")
prev_hash = binascii.hexlify(self.prevout.hash).decode("ascii")
return ("Input(prevout=({}, {:d}), script={}, sequence={:d})"
.format(prev_hash, self.prevout.n, script, self.sequence))
def __str__(self):
script = self.script.hex()
prev_hash = hash_to_str(self.prev_hash)
return ("Input({}, {:d}, script={}, sequence={:d})"
.format(prev_hash, self.prev_idx, script, self.sequence))
class TxOutput(namedtuple("TxOutput", "value pk_script")):
@ -56,11 +55,12 @@ class Deserializer(object):
self.cursor = 0
def read_tx(self):
version = self.read_le_int32()
inputs = self.read_inputs()
outputs = self.read_outputs()
locktime = self.read_le_uint32()
return Tx(version, inputs, outputs, locktime)
return Tx(
self.read_le_int32(), # version
self.read_inputs(), # inputs
self.read_outputs(), # outputs
self.read_le_uint32() # locktime
)
def read_block(self):
tx_hashes = []
@ -81,15 +81,12 @@ class Deserializer(object):
return [self.read_input() for i in range(n)]
def read_input(self):
prevout = self.read_outpoint()
script = self.read_varbytes()
sequence = self.read_le_uint32()
return TxInput(prevout, script, sequence)
def read_outpoint(self):
hash = self.read_nbytes(32)
n = self.read_le_uint32()
return OutPoint(hash, n)
return TxInput(
self.read_nbytes(32), # prev_hash
self.read_le_uint32(), # prev_idx
self.read_varbytes(), # script
self.read_le_uint32() # sequence
)
def read_outputs(self):
n = self.read_varint()

8
lib/util.py

@ -2,10 +2,18 @@
# and warranty status of this software.
import array
import logging
import sys
from collections import Container, Mapping
class LoggedClass(object):
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.INFO)
# Method decorator. To be used for calculations that will always
# deliver the same result. The method cannot take any arguments
# and should be accessed as an attribute.

37
query.py

@ -7,14 +7,35 @@ import os
import sys
from server.env import Env
from server.db import DB
from server.block_processor import BlockProcessor
from lib.hash import hash_to_str
def count_entries(db):
utxos = 0
for key in db.iterator(prefix=b'u', include_value=False):
utxos += 1
print("UTXO count:", utxos)
hash168 = 0
for key in db.iterator(prefix=b'h', include_value=False):
hash168 += 1
print("Hash168 count:", hash168)
hist = 0
for key in db.iterator(prefix=b'H', include_value=False):
hist += 1
print("History addresses:", hist)
def main():
env = Env()
coin = env.coin
os.chdir(env.db_dir)
db = DB(env)
coin = db.coin
bp = BlockProcessor(env, None)
if len(sys.argv) == 1:
count_entries(bp.db)
return
argc = 1
try:
limit = int(sys.argv[argc])
@ -25,19 +46,19 @@ def main():
print('Address: ', addr)
hash168 = coin.address_to_hash168(addr)
n = None
for n, (tx_hash, height) in enumerate(db.get_history(hash168, limit)):
for n, (tx_hash, height) in enumerate(bp.get_history(hash168, limit)):
print('History #{:d}: hash: {} height: {:d}'
.format(n + 1, bytes(reversed(tx_hash)).hex(), height))
.format(n + 1, hash_to_str(tx_hash), height))
if n is None:
print('No history')
n = None
for n, utxo in enumerate(db.get_utxos(hash168, limit)):
for n, utxo in enumerate(bp.get_utxos(hash168, limit)):
print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}'
.format(n + 1, bytes(reversed(utxo.tx_hash)).hex(),
.format(n + 1, hash_to_str(utxo.tx_hash),
utxo.tx_pos, utxo.height, utxo.value))
if n is None:
print('No UTXOs')
balance = db.get_balance(hash168)
balance = bp.get_balance(hash168)
print('Balance: {} {}'.format(coin.decimal_value(balance),
coin.SHORTNAME))

22
samples/scripts/NOTES

@ -5,6 +5,11 @@ NETWORK - see lib/coins.py, must be a coin NET
DB_DIRECTORY - path to the database directory (if relative, to run script)
USERNAME - the username the server will run as
SERVER_MAIN - path to the server_main.py script (if relative, to run script)
DAEMON_URL - the URL used to connect to the daemon. Should be of the form
http://username:password@hostname:port/
Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD,
DAEMON_HOST and DAEMON_PORT. DAEMON_PORT is optional and
will default appropriately for COIN.
In addition either RPC_URL must be given as the full RPC URL for
connecting to the daemon, or you must specify RPC_HOST, RPC_USER,
@ -14,6 +19,11 @@ the coin and network otherwise).
The other environment variables are all optional and will adopt
sensible defaults if not specified.
REORG_LIMIT - maximum number of blocks to be able to handle in a chain
reorganisation. ElectrumX retains some fairly compact
undo information for this many blocks in levelDB.
Default is 200.
Your performance might change by tweaking these cache settings. Cache
size is only checked roughly every minute, so the caches can grow
beyond the specified size. Also the Python process is often quite a
@ -25,14 +35,14 @@ HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger
as history is append-only and not searched.
CACHE_MB- amount of UTXO and history cache, in MB, to retain before
UTXO_MB - amount of UTXO and history cache, in MB, to retain before
flushing to disk. Default is 1000. This may be too large
for small boxes or too small for machines with lots of RAM.
Larger caches generally perform better as there is
significant searching of the UTXO cache during indexing.
However, I don't see much benefit in my tests pushing this
beyond 2000, and in fact beyond there performance begins to
fall. My machine has 24GB RAM; the slow down is probably
because of leveldb caching and Python GC effects. However
this may be very dependent on hardware and you may have
different results.
too high, and in fact performance begins to fall. My
machine has 24GB RAM; the slow down is probably because of
leveldb caching and Python GC effects. However this may be
very dependent on hardware and you may have different
results.

0
samples/scripts/env/RPC_HOST → samples/scripts/env/DAEMON_HOST

0
samples/scripts/env/RPC_PASSWORD → samples/scripts/env/DAEMON_PASSWORD

0
samples/scripts/env/RPC_PORT → samples/scripts/env/DAEMON_PORT

0
samples/scripts/env/RPC_USERNAME → samples/scripts/env/DAEMON_USERNAME

0
samples/scripts/env/CACHE_MB → samples/scripts/env/UTXO_MB

11
samples/systemd-unit

@ -0,0 +1,11 @@
[Unit]
Description=Electrumx
After=network.target
[Service]
EnvironmentFile=/etc/electrumx.conf
ExecStart=/home/electrumx/electrumx/server_main.py
User=electrumx
[Install]
WantedBy=multi-user.target

714
server/block_processor.py

@ -0,0 +1,714 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import array
import ast
import asyncio
import struct
import time
from bisect import bisect_left
from collections import defaultdict, namedtuple
from functools import partial
import plyvel
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import DaemonError
from lib.hash import hash_to_str
from lib.script import ScriptPubKey
from lib.util import chunks, LoggedClass
def formatted_time(t):
'''Return a number of seconds as a string in days, hours, mins and
secs.'''
t = int(t)
return '{:d}d {:02d}h {:02d}m {:02d}s'.format(
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class ChainError(Exception):
pass
class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).'''
def __init__(self, daemon, height):
super().__init__()
self.daemon = daemon
self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue()
self.queue_size = 0
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
self.fetched_height = height
self.recent_sizes = [0]
async def get_blocks(self):
'''Returns a list of prefetched blocks.'''
blocks, total_size = await self.queue.get()
self.queue_size -= total_size
return blocks
async def clear(self, height):
'''Clear prefetched blocks and restart from the given height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch coroutine so we must
synchronize.
'''
with await self.semaphore:
while not self.queue.empty():
self.queue.get_nowait()
self.queue_size = 0
self.fetched_height = height
async def start(self):
'''Loop forever polling for more blocks.'''
self.logger.info('prefetching blocks...')
while True:
while self.queue_size < self.target_cache_size:
try:
with await self.semaphore:
await self._prefetch()
except DaemonError as e:
self.logger.info('ignoring daemon errors: {}'.format(e))
await asyncio.sleep(2)
def _prefill_count(self, room):
ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
count = room // ave_size if ave_size else 0
return max(count, 10)
async def _prefetch(self):
'''Prefetch blocks if there are any to prefetch.'''
daemon_height = await self.daemon.height()
max_count = min(daemon_height - self.fetched_height, 4000)
count = min(max_count, self._prefill_count(self.target_cache_size))
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if not hex_hashes:
return
blocks = await self.daemon.raw_blocks(hex_hashes)
sizes = [len(block) for block in blocks]
total_size = sum(sizes)
self.queue.put_nowait((blocks, total_size))
self.queue_size += total_size
self.fetched_height += len(blocks)
# Keep 50 most recent block sizes for fetch count estimation
self.recent_sizes.extend(sizes)
excess = len(self.recent_sizes) - 50
if excess > 0:
self.recent_sizes = self.recent_sizes[excess:]
class BlockProcessor(LoggedClass):
'''Process blocks and update the DB state to match.
Employ a prefetcher to prefetch blocks in batches for processing.
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon):
super().__init__()
self.daemon = daemon
# Meta
self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.coin = env.coin
self.caught_up = False
self.reorg_limit = env.reorg_limit
# Chain state (initialize to genesis in case of new DB)
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
# Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin)
self.tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
# Caches to be flushed later. Headers and tx_hashes have one
# entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.backup_hash168s = set()
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
# Redirected member func
self.get_tx_hash = self.fs_cache.get_tx_hash
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} utxo flush count: {:,d} '
'sync time: {}'
.format(self.coin.NAME, self.coin.NET, self.height,
self.tx_count, self.flush_count,
self.utxo_flush_count,
formatted_time(self.wall_time)))
self.logger.info('reorg limit of {:,d} blocks'
.format(self.reorg_limit))
self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
self.clean_db()
def coros(self, force_backup=False):
if force_backup:
return [self.force_chain_reorg(True), self.prefetcher.start()]
else:
return [self.start(), self.prefetcher.start()]
async def start(self):
'''External entry point for block processing.
A simple wrapper that safely flushes the DB on clean
shutdown.
'''
try:
await self.advance_blocks()
finally:
self.flush(True)
async def advance_blocks(self):
'''Loop forever processing blocks in the forward direction.'''
while True:
blocks = await self.prefetcher.get_blocks()
for block in blocks:
if not self.advance_block(block):
await self.handle_chain_reorg()
self.caught_up = False
break
await asyncio.sleep(0) # Yield
if self.height != self.daemon.cached_height():
continue
if not self.caught_up:
self.caught_up = True
self.logger.info('caught up to height {:,d}'
.format(self.height))
# Flush everything when in caught-up state as queries
# are performed on DB not in-memory
self.flush(True)
async def force_chain_reorg(self, to_genesis):
try:
await self.handle_chain_reorg(to_genesis)
finally:
self.flush(True)
async def handle_chain_reorg(self, to_genesis=False):
# First get all state on disk
self.logger.info('chain reorg detected')
self.flush(True)
self.logger.info('finding common height...')
hashes = await self.reorg_hashes(to_genesis)
# Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks)
self.logger.info('backed up to height {:,d}'.format(self.height))
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
async def reorg_hashes(self, to_genesis):
'''Return the list of hashes to back up beacuse of a reorg.
The hashes are returned in order of increasing height.'''
def match_pos(hashes1, hashes2):
for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)):
if hash1 == hash2:
return n
return -1
start = self.height - 1
count = 1
while start > 0:
self.logger.info('start: {:,d} count: {:,d}'.format(start, count))
hashes = self.fs_cache.block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes)
if n >= 0 and not to_genesis:
start += n + 1
break
count = min(count * 2, start)
start -= count
# Hashes differ from height 'start'
count = (self.height - start) + 1
self.logger.info('chain was reorganised for {:,d} blocks from '
'height {:,d} to height {:,d}'
.format(count, start, start + count - 1))
return self.fs_cache.block_hashes(start, count)
def open_db(self, coin):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
try:
db = plyvel.DB(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except:
db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
self.logger.info('created new database {}'.format(db_name))
else:
self.logger.info('successfully opened database {}'.format(db_name))
self.read_state(db)
return db
def read_state(self, db):
state = db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise ChainError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
def clean_db(self):
'''Clean out stale DB items.
Stale DB items are excess history flushed since the most
recent UTXO flush (only happens on unclean shutdown), and aged
undo information.
'''
if self.flush_count < self.utxo_flush_count:
raise ChainError('DB corrupt: flush_count < utxo_flush_count')
with self.db.write_batch(transaction=True) as batch:
if self.flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
self.remove_excess_history(batch)
self.utxo_flush_count = self.flush_count
self.remove_stale_undo_items(batch)
self.flush_state(batch)
def remove_excess_history(self, batch):
prefix = b'H'
unpack = struct.unpack
keys = []
for key, hist in self.db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def remove_stale_undo_items(self, batch):
prefix = b'U'
unpack = struct.unpack
cutoff = self.db_height - self.reorg_limit
keys = []
for key, hist in self.db.iterator(prefix=prefix):
height, = unpack('>I', key[-4:])
if height > cutoff:
break
keys.append(key)
self.logger.info('deleting {:,d} stale undo entries'
.format(len(keys)))
for key in keys:
batch.delete(key)
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.tx_count
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
}
batch.put(b'state', repr(state).encode())
def flush_utxos(self, batch):
self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks'
.format(self.tx_count - self.db_tx_count,
self.height - self.db_height))
self.utxo_cache.flush(batch)
self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
def assert_flushed(self):
'''Asserts state is fully flushed.'''
assert self.tx_count == self.db_tx_count
assert not self.history
assert not self.utxo_cache.cache
assert not self.utxo_cache.db_cache
assert not self.backup_hash168s
def flush(self, flush_utxos=False):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height:
self.logger.info('nothing to flush')
self.assert_flushed()
return
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count
# Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to
# have updated the DB.
if self.height > self.db_height:
self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch(transaction=True) as batch:
# History first - fast and frees memory. Flush state last
# as it reads the wall time.
if self.height > self.db_height:
self.flush_history(batch)
else:
self.backup_history(batch)
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('committing transaction...')
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
flush_time = int(self.last_flush - flush_start)
self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s'
.format(self.flush_count, self.height, self.tx_count,
flush_time))
# Catch-up stats
if not self.caught_up and tx_diff > 0:
daemon_height = self.daemon.cached_height()
txs_per_sec = int(self.tx_count / self.wall_time)
this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
else:
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
* self.coin.TX_PER_BLOCK
+ (self.coin.TX_COUNT - self.tx_count))
self.logger.info('tx/sec since genesis: {:,d}, '
'since last flush: {:,d}'
.format(txs_per_sec, this_txs_per_sec))
self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time),
formatted_time(tx_est / this_txs_per_sec)))
def flush_history(self, batch):
self.logger.info('flushing history')
assert not self.backup_hash168s
self.flush_count += 1
flush_id = struct.pack('>H', self.flush_count)
for hash168, hist in self.history.items():
key = b'H' + hash168 + flush_id
batch.put(key, hist.tobytes())
self.logger.info('{:,d} history entries in {:,d} addrs'
.format(self.history_size, len(self.history)))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def backup_history(self, batch):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
# Drop any NO_CACHE entry
self.backup_hash168s.discard(NO_CACHE_ENTRY)
assert not self.history
nremoves = 0
for hash168 in sorted(self.backup_hash168s):
prefix = b'H' + hash168
deletes = []
puts = {}
for key, hist in self.db.iterator(reverse=True, prefix=prefix):
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= self.tx_count
idx = bisect_left(a, self.tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
break
deletes.append(key)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.logger.info('removed {:,d} history entries from {:,d} addresses'
.format(nremoves, len(self.backup_hash168s)))
self.backup_hash168s = set()
def cache_sizes(self):
'''Returns the approximate size of the cache, in MB.'''
# Good average estimates based on traversal of subobjects and
# requesting size from Python (see deep_getsizeof). For
# whatever reason Python O/S mem usage is typically +30% or
# more, so we scale our already bloated object sizes.
one_MB = int(1048576 / 1.3)
utxo_cache_size = len(self.utxo_cache.cache) * 187
db_cache_size = len(self.utxo_cache.db_cache) * 105
hist_cache_size = len(self.history) * 180 + self.history_size * 4
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB
hist_MB = hist_cache_size // one_MB
self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
.format(self.height, self.daemon.cached_height()))
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
'hist addrs: {:,d} hist size {:,d}'
.format(len(self.utxo_cache.cache),
len(self.utxo_cache.db_cache),
len(self.history),
self.history_size))
self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)'
.format(utxo_MB + hist_MB, utxo_MB, hist_MB))
return utxo_MB, hist_MB
def undo_key(self, height):
'''DB key for undo information at the given height.'''
return b'U' + struct.pack('>I', height)
def write_undo_info(self, height, undo_info):
'''Write out undo information for the current height.'''
self.db.put(self.undo_key(height), undo_info)
def read_undo_info(self, height):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def advance_block(self, block):
# We must update the fs_cache before calling advance_txs() as
# the UTXO cache uses the fs_cache via get_tx_hash() to
# resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
self.fs_cache.advance_block(header, tx_hashes, txs)
prev_hash, header_hash = self.coin.header_hashes(header)
if prev_hash != self.tip:
return False
self.tip = header_hash
self.height += 1
undo_info = self.advance_txs(tx_hashes, txs)
if self.daemon.cached_height() - self.height <= self.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info))
# Check if we're getting full and time to flush?
now = time.time()
if now > self.next_cache_check:
self.next_cache_check = now + 60
utxo_MB, hist_MB = self.cache_sizes()
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(utxo_MB >= self.utxo_MB)
return True
def advance_txs(self, tx_hashes, txs):
put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend
undo_info = []
# Use local vars for speed in the loops
history = self.history
tx_num = self.tx_count
coin = self.coin
parse_script = ScriptPubKey.from_script
pack = struct.pack
for tx, tx_hash in zip(txs, tx_hashes):
hash168s = set()
tx_numb = pack('<I', tx_num)
# Spend the inputs
if not tx.is_coinbase:
for txin in tx.inputs:
cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
undo_info.append(cache_value)
hash168s.add(cache_value[:21])
# Add the new UTXOs
for idx, txout in enumerate(tx.outputs):
# Get the hash168. Ignore scripts we can't grok.
hash168 = parse_script(txout.pk_script, coin).hash168
if hash168:
hash168s.add(hash168)
put_utxo(tx_hash + pack('<H', idx),
hash168 + tx_numb + pack('<Q', txout.value))
# Drop any NO_CACHE entry
hash168s.discard(NO_CACHE_ENTRY)
for hash168 in hash168s:
history[hash168].append(tx_num)
self.history_size += len(hash168s)
tx_num += 1
self.tx_count = tx_num
return undo_info
def backup_blocks(self, blocks):
'''Backup the blocks and flush.
The blocks should be in order of decreasing height.
A flush is performed once the blocks are backed up.
'''
self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed()
for block in blocks:
header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
if header_hash != self.tip:
raise ChainError('backup block {} is not tip {} at height {:,d}'
.format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs)
self.fs_cache.backup_block()
self.tip = prev_hash
self.height -= 1
self.logger.info('backed up to height {:,d}'.format(self.height))
self.flush(True)
def backup_txs(self, tx_hashes, txs):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
n = len(undo_info)
# Use local vars for speed in the loops
pack = struct.pack
put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend
hash168s = self.backup_hash168s
rtxs = reversed(txs)
rtx_hashes = reversed(tx_hashes)
for tx_hash, tx in zip(rtx_hashes, rtxs):
# Spend the outputs
for idx, txout in enumerate(tx.outputs):
cache_value = spend_utxo(tx_hash, idx)
hash168s.add(cache_value[:21])
# Restore the inputs
if not tx.is_coinbase:
for txin in reversed(tx.inputs):
n -= 33
undo_item = undo_info[n:n+33]
put_utxo(txin.prev_hash + pack('<H', txin.prev_idx),
undo_item)
hash168s.add(undo_item[:21])
assert n == 0
self.tx_count -= len(txs)
@staticmethod
def resolve_limit(limit):
if limit is None:
return -1
assert isinstance(limit, int) and limit >= 0
return limit
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
prefix = b'H' + hash168
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.get_tx_hash(tx_num)
limit -= 1
def get_balance(self, hash168):
'''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
def get_utxos(self, hash168, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
unpack = struct.unpack
prefix = b'u' + hash168
utxos = []
for k, v in self.db.iterator(prefix=prefix):
(tx_pos, ) = unpack('<H', k[-2:])
for n in range(0, len(v), 12):
if limit == 0:
return
(tx_num, ) = unpack('<I', v[n:n+4])
(value, ) = unpack('<Q', v[n+4:n+12])
tx_hash, height = self.get_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
limit -= 1
def get_utxos_sorted(self, hash168):
'''Returns all the UTXOs for an address sorted by height and
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def get_current_header(self):
'''Returns the current header as a dictionary.'''
return self.fs_cache.encode_header(self.height)

400
server/cache.py

@ -0,0 +1,400 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import array
import itertools
import os
import struct
from bisect import bisect_right
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256, hash_to_str
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
HIST_ENTRIES_PER_KEY = 1024
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
ADDR_TX_HASH_LEN = 4
UTXO_TX_HASH_LEN = 4
NO_HASH_168 = bytes([255]) * 21
NO_CACHE_ENTRY = NO_HASH_168 + bytes(12)
class UTXOCache(LoggedClass):
'''An in-memory UTXO cache, representing all changes to UTXO state
since the last DB flush.
We want to store millions, perhaps 10s of millions of these in
memory for optimal performance during initial sync, because then
it is possible to spend UTXOs without ever going to the database
(other than as an entry in the address history, and there is only
one such entry per TX not per UTXO). So store them in a Python
dictionary with binary keys and values.
Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes)
Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes)
That's 67 bytes of raw data. Python dictionary overhead means
each entry actually uses about 187 bytes of memory. So almost
11.5 million UTXOs can fit in 2GB of RAM. There are approximately
42 million UTXOs on bitcoin mainnet at height 433,000.
Semantics:
add: Add it to the cache dictionary.
spend: Remove it if in the cache dictionary.
Otherwise it's been flushed to the DB. Each UTXO
is responsible for two entries in the DB stored using
compressed keys. Mark both for deletion in the next
flush of the in-memory UTXO cache.
A UTXO is stored in the DB in 2 "tables":
1. The output value and tx number. Must be keyed with a
hash168 prefix so the unspent outputs and balance of an
arbitrary address can be looked up with a simple key
traversal.
Key: b'u' + hash168 + compressed_tx_hash + tx_idx
Value: a (tx_num, value) pair
2. Given a prevout, we need to be able to look up the UTXO key
to remove it. As is keyed by hash168 and that is not part
of the prevout, we need a hash168 lookup.
Key: b'h' + compressed tx_hash + tx_idx
Value: (hash168, tx_num) pair
The compressed TX hash is just the first few bytes of the hash of
the TX the UTXO is in (and needn't be the same number of bytes in
each table). As this is not unique there will be collisions;
tx_num is stored to resolve them. The collision rate is around
0.02% for the hash168 table, and almost zero for the UTXO table
(there are around 100 collisions in the whole bitcoin blockchain).
'''
def __init__(self, parent, db, coin):
super().__init__()
self.parent = parent
self.coin = coin
self.cache = {}
self.put = self.cache.__setitem__
self.db = db
self.db_cache = {}
# Statistics
self.cache_spends = 0
self.db_deletes = 0
def spend(self, prev_hash, prev_idx):
'''Spend a UTXO and return the cache's value.
If the UTXO is not in the cache it must be on disk.
'''
# Fast track is it's in the cache
pack = struct.pack
idx_packed = pack('<H', prev_idx)
value = self.cache.pop(prev_hash + idx_packed, None)
if value:
self.cache_spends += 1
return value
# Oh well. Find and remove it from the DB.
hash168 = self.hash168(prev_hash, idx_packed)
if not hash168:
return NO_CACHE_ENTRY
self.db_deletes += 1
# Read the UTXO through the cache from the disk. We have to
# go through the cache because compressed keys can collide.
key = b'u' + hash168 + prev_hash[:UTXO_TX_HASH_LEN] + idx_packed
data = self.cache_get(key)
if data is None:
# Uh-oh, this should not happen...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(hash_to_str(prev_hash), prev_idx,
bytes(key).hex()))
return NO_CACHE_ENTRY
if len(data) == 12:
self.cache_delete(key)
return hash168 + data
# Resolve the compressed key collison. These should be
# extremely rare.
assert len(data) % 12 == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.parent.get_tx_hash(tx_num)
if prev_hash == tx_hash:
result = hash168 + data[n: n+12]
data = data[:n] + data[n+12:]
self.cache_write(key, data)
return result
raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx_packed):
'''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is
indicates a non-standard script).
'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idx_packed
data = self.cache_get(key)
if data is None:
# Assuming the DB is not corrupt, this indicates a
# successful spend of a non-standard script
return None
if len(data) == 25:
self.cache_delete(key)
return data[:21]
assert len(data) % 25 == 0
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num)
if my_hash == tx_hash:
self.cache_write(key, data[:n] + data[n+25:])
return data[n:n+21]
raise Exception('could not resolve hash168 collision')
def cache_write(self, key, value):
'''Cache write of a (key, value) pair to the DB.'''
assert(bool(value))
self.db_cache[key] = value
def cache_delete(self, key):
'''Cache deletion of a key from the DB.'''
self.db_cache[key] = None
def cache_get(self, key):
'''Fetch a value from the DB through our write cache.'''
value = self.db_cache.get(key)
if value:
return value
return self.db.get(key)
def flush(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
hcolls = ucolls = 0
new_utxos = len(self.cache)
for cache_key, cache_value in self.cache.items():
# Frist write to the hash168 lookup table
key = b'h' + cache_key[:ADDR_TX_HASH_LEN] + cache_key[-2:]
value = cache_value[:25]
prior_value = self.cache_get(key)
if prior_value: # Should rarely happen
hcolls += 1
value += prior_value
self.cache_write(key, value)
# Next write the UTXO table
key = (b'u' + cache_value[:21] + cache_key[:UTXO_TX_HASH_LEN]
+ cache_key[-2:])
value = cache_value[-12:]
prior_value = self.cache_get(key)
if prior_value: # Should almost never happen
ucolls += 1
value += prior_value
self.cache_write(key, value)
# GC-ing this now can only help the levelDB write.
self.cache = {}
self.put = self.cache.__setitem__
# Now we can update to the batch.
for key, value in self.db_cache.items():
if value:
batch.put(key, value)
else:
batch.delete(key)
self.db_cache = {}
adds = new_utxos + self.cache_spends
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
.format(adds, self.cache_spends))
self.logger.info('UTXO DB adds: {:,d} spends: {:,d}. '
'Collisions: hash168: {:,d} UTXO: {:,d}'
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.cache_spends = self.db_deletes = 0
class FSCache(LoggedClass):
def __init__(self, coin, height, tx_count):
super().__init__()
self.coin = coin
self.tx_hash_file_size = 16 * 1024 * 1024
assert self.tx_hash_file_size % 32 == 0
# On-disk values, updated by a flush
self.height = height
# Unflushed items
self.headers = []
self.tx_hashes = []
is_new = height == -1
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
if self.tx_counts:
assert tx_count == self.tx_counts[-1]
else:
assert tx_count == 0
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def advance_block(self, header, tx_hashes, txs):
'''Update the FS cache for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def backup_block(self):
'''Revert a block.'''
assert not self.headers
assert not self.tx_hashes
assert self.height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.height -= 1
self.tx_counts.pop()
def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
self.logger.info('flushing to file system')
blocks_done = len(self.headers)
prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.height + blocks_done == new_height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
assert cur_tx_count == new_tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.tx_hashes = []
self.headers = []
self.height += blocks_done
def read_headers(self, height, count):
read_count = min(count, self.height + 1 - height)
assert height >= 0 and read_count >= 0
assert count <= read_count + len(self.headers)
result = b''
if read_count > 0:
header_len = self.coin.HEADER_LEN
self.headers_file.seek(height * header_len)
result = self.headers_file.read(read_count * header_len)
count -= read_count
if count:
start = (height + read_count) - (self.height + 1)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.height:
tx_hashes = self.tx_hashes[height - (self.height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]
def encode_header(self, height):
if height < 0 or height > self.height + len(self.headers):
raise Exception('no header information for height {:,d}'
.format(height))
header = self.read_headers(self.height, 1)
unpack = struct.unpack
version, = unpack('<I', header[:4])
timestamp, bits, nonce = unpack('<III', header[68:80])
return {
'block_height': self.height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}

165
server/controller.py

@ -0,0 +1,165 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import asyncio
import signal
import traceback
from functools import partial
from server.daemon import Daemon, DaemonError
from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC
from lib.hash import (sha256, double_sha256, hash_to_str,
Base58, hex_str_to_hash)
from lib.util import LoggedClass
class Controller(LoggedClass):
def __init__(self, loop, env):
'''Create up the controller.
Creates DB, Daemon and BlockProcessor instances.
'''
super().__init__()
self.loop = loop
self.env = env
self.daemon = Daemon(env.daemon_url)
self.block_processor = BlockProcessor(env, self.daemon)
self.servers = []
self.sessions = set()
self.addresses = {}
self.jobs = set()
self.peers = {}
def start(self):
'''Prime the event loop with asynchronous servers and jobs.'''
env = self.env
loop = self.loop
coros = self.block_processor.coros()
if False:
self.start_servers()
coros.append(self.reap_jobs())
for coro in coros:
asyncio.ensure_future(coro)
# Signal handlers
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def start_servers(self):
protocol = partial(LocalRPC, self)
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(loop.run_until_complete(rpc_server))
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, self, self.daemon, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(loop.run_until_complete(tcp_server))
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
ssl_server = loop.create_server(protocol, env.host, env.ssl_port)
self.servers.append(loop.run_until_complete(ssl_server))
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
def on_signal(self, signame):
'''Call on receipt of a signal to cleanly shutdown.'''
self.logger.warning('received {} signal, preparing to shut down'
.format(signame))
for task in asyncio.Task.all_tasks(self.loop):
task.cancel()
def add_session(self, session):
self.sessions.add(session)
def remove_session(self, session):
self.sessions.remove(session)
def add_job(self, coro):
'''Queue a job for asynchronous processing.'''
self.jobs.add(asyncio.ensure_future(coro))
async def reap_jobs(self):
while True:
jobs = set()
for job in self.jobs:
if job.done():
try:
job.result()
except Exception as e:
traceback.print_exc()
else:
jobs.add(job)
self.logger.info('reaped {:d} jobs, {:d} jobs pending'
.format(len(self.jobs) - len(jobs), len(jobs)))
self.jobs = jobs
await asyncio.sleep(5)
def address_status(self, hash168):
'''Returns status as 32 bytes.'''
status = self.addresses.get(hash168)
if status is None:
history = self.block_processor.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
status = sha256(status.encode())
self.addresses[hash168] = status
return status
async def get_merkle(self, tx_hash, height):
'''tx_hash is a hex string.'''
block_hash = await self.daemon.send_single('getblockhash', (height,))
block = await self.daemon.send_single('getblock', (block_hash, True))
tx_hashes = block['tx']
# This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash)
idx = pos
hashes = [hex_str_to_hash(txh) for txh in tx_hashes]
merkle_branch = []
while len(hashes) > 1:
if len(hashes) & 1:
hashes.append(hashes[-1])
idx = idx - 1 if (idx & 1) else idx + 1
merkle_branch.append(hash_to_str(hashes[idx]))
idx //= 2
hashes = [double_sha256(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def get_peers(self):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return self.peers
def height(self):
return self.block_processor.height
def get_current_header(self):
return self.block_processor.get_current_header()
def get_history(self, hash168):
history = self.block_processor.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]

98
server/daemon.py

@ -0,0 +1,98 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
'''Classes for handling asynchronous connections to a blockchain
daemon.'''
import asyncio
import json
import aiohttp
from lib.util import LoggedClass
class DaemonError(Exception):
'''Raised when the daemon returns an error in its results that
cannot be remedied by retrying.'''
class Daemon(LoggedClass):
'''Handles connections to a daemon at the given URL.'''
def __init__(self, url):
super().__init__()
self.url = url
self._height = None
self.logger.info('connecting to daemon at URL {}'.format(url))
async def send_single(self, method, params=None):
payload = {'method': method}
if params:
payload['params'] = params
result, = await self.send((payload, ))
return result
async def send_many(self, mp_pairs):
if mp_pairs:
payload = [{'method': method, 'params': params}
for method, params in mp_pairs]
return await self.send(payload)
return []
async def send_vector(self, method, params_list):
if params_list:
payload = [{'method': method, 'params': params}
for params in params_list]
return await self.send(payload)
return []
async def send(self, payload):
assert isinstance(payload, (tuple, list))
data = json.dumps(payload)
while True:
try:
async with aiohttp.post(self.url, data=data) as resp:
result = await resp.json()
except asyncio.CancelledError:
raise
except Exception as e:
msg = 'aiohttp error: {}'.format(e)
secs = 3
else:
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == -28 for err in errs):
msg = 'daemon still warming up.'
secs = 30
else:
msg = '{}'.format(errs)
raise DaemonError(msg)
self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs))
await asyncio.sleep(secs)
async def block_hex_hashes(self, first, count):
'''Return the hex hashes of count block starting at height first.'''
param_lists = [[height] for height in range(first, first + count)]
return await self.send_vector('getblockhash', param_lists)
async def raw_blocks(self, hex_hashes):
'''Return the raw binary blocks with the given hex hashes.'''
param_lists = [(h, False) for h in hex_hashes]
blocks = await self.send_vector('getblock', param_lists)
# Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks]
async def height(self):
'''Query the daemon for its current height.'''
self._height = await self.send_single('getblockcount')
return self._height
def cached_height(self):
'''Return the cached daemon height.
If the daemon has not been queried yet this returns None.'''
return self._height

679
server/db.py

@ -1,679 +0,0 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import array
import ast
import itertools
import os
import struct
import time
from binascii import hexlify, unhexlify
from bisect import bisect_right
from collections import defaultdict, namedtuple
from functools import partial
import logging
import plyvel
from lib.coins import Bitcoin
from lib.script import ScriptPubKey
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
HIST_ENTRIES_PER_KEY = 1024
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
ADDR_TX_HASH_LEN = 4
UTXO_TX_HASH_LEN = 4
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
def formatted_time(t):
t = int(t)
return '{:d}d {:02d}h {:02d}m {:02d}s'.format(
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
class UTXOCache(object):
'''An in-memory UTXO cache, representing all changes to UTXO state
since the last DB flush.
We want to store millions, perhaps 10s of millions of these in
memory for optimal performance during initial sync, because then
it is possible to spend UTXOs without ever going to the database
(other than as an entry in the address history, and there is only
one such entry per TX not per UTXO). So store them in a Python
dictionary with binary keys and values.
Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes)
Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes)
That's 67 bytes of raw data. Python dictionary overhead means
each entry actually uses about 187 bytes of memory. So almost
11.5 million UTXOs can fit in 2GB of RAM. There are approximately
42 million UTXOs on bitcoin mainnet at height 433,000.
Semantics:
add: Add it to the cache dictionary.
spend: Remove it if in the cache dictionary.
Otherwise it's been flushed to the DB. Each UTXO
is responsible for two entries in the DB stored using
compressed keys. Mark both for deletion in the next
flush of the in-memory UTXO cache.
A UTXO is stored in the DB in 2 "tables":
1. The output value and tx number. Must be keyed with a
hash168 prefix so the unspent outputs and balance of an
arbitrary address can be looked up with a simple key
traversal.
Key: b'u' + hash168 + compressed_tx_hash + tx_idx
Value: a (tx_num, value) pair
2. Given a prevout, we need to be able to look up the UTXO key
to remove it. As is keyed by hash168 and that is not part
of the prevout, we need a hash168 lookup.
Key: b'h' + compressed tx_hash + tx_idx
Value: (hash168, tx_num) pair
The compressed TX hash is just the first few bytes of the hash of
the TX the UTXO is in (and needn't be the same number of bytes in
each table). As this is not unique there will be collisions;
tx_num is stored to resolve them. The collision rate is around
0.02% for the hash168 table, and almost zero for the UTXO table
(there are around 100 collisions in the whole bitcoin blockchain).
'''
def __init__(self, parent, db, coin):
self.logger = logging.getLogger('UTXO')
self.logger.setLevel(logging.INFO)
self.parent = parent
self.coin = coin
self.cache = {}
self.db = db
self.db_cache = {}
# Statistics
self.adds = 0
self.cache_hits = 0
self.db_deletes = 0
def add_many(self, tx_hash, tx_num, txouts):
'''Add a sequence of UTXOs to the cache, return the set of hash168s
seen.
Pass the hash of the TX it appears in, its TX number, and the
TX outputs.
'''
parse_script = ScriptPubKey.from_script
pack = struct.pack
tx_numb = pack('<I', tx_num)
hash168s = set()
self.adds += len(txouts)
for idx, txout in enumerate(txouts):
# Get the hash168. Ignore scripts we can't grok.
pk = parse_script(txout.pk_script, self.coin)
hash168 = pk.hash168
if not hash168:
continue
hash168s.add(hash168)
key = tx_hash + pack('<H', idx)
# Well-known duplicate coinbases from heights 91722-91880
# that destoyed 100 BTC forever:
# e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468
# d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599
#if key in self.cache:
# self.logger.info('duplicate tx hash {}'
# .format(bytes(reversed(tx_hash)).hex()))
# b''.join avoids this: https://bugs.python.org/issue13298
self.cache[key] = b''.join(
(hash168, tx_numb, pack('<Q', txout.value)))
return hash168s
def spend(self, prevout):
'''Spend a UTXO and return the address spent.
If the UTXO is not in the cache it must be on disk.
'''
# Fast track is it's in the cache
pack = struct.pack
key = b''.join((prevout.hash, pack('<H', prevout.n)))
value = self.cache.pop(key, None)
if value:
self.cache_hits += 1
return value[:21]
# Oh well. Find and remove it from the DB.
hash168 = self.hash168(prevout.hash, prevout.n)
if not hash168:
return None
self.db_deletes += 1
# Read the UTXO through the cache from the disk. We have to
# go through the cache because compressed keys can collide.
key = (b'u' + hash168 + prevout.hash[:UTXO_TX_HASH_LEN]
+ pack('<H', prevout.n))
data = self.cache_get(key)
if data is None:
# Uh-oh, this should not happen...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(bytes(reversed(prevout.hash)).hex(),
prevout.n, bytes(key).hex()))
return hash168
if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.cache_delete(key)
return hash168
# Resolve the compressed key collison. These should be
# extremely rare.
assert len(data) % 12 == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.parent.get_tx_hash(tx_num)
if prevout.hash == tx_hash:
data = data[:n] + data[n + 12:]
self.cache_write(key, data)
return hash168
raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx):
'''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is
indicates a non-standard script).
'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + struct.pack('<H', idx)
data = self.cache_get(key)
if data is None:
# Assuming the DB is not corrupt, this indicates a
# successful spend of a non-standard script
# self.logger.info('ignoring spend of non-standard UTXO {} / {:d}'
# .format(bytes(reversed(tx_hash)).hex(), idx)))
return None
if len(data) == 25:
self.cache_delete(key)
return data[:21]
assert len(data) % 25 == 0
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num)
if my_hash == tx_hash:
self.cache_write(key, data[:n] + data[n+25:])
return data[n:n+21]
raise Exception('could not resolve hash168 collision')
def cache_write(self, key, value):
'''Cache write of a (key, value) pair to the DB.'''
assert(bool(value))
self.db_cache[key] = value
def cache_delete(self, key):
'''Cache deletion of a key from the DB.'''
self.db_cache[key] = None
def cache_get(self, key):
'''Fetch a value from the DB through our write cache.'''
value = self.db_cache.get(key)
if value:
return value
return self.db.get(key)
def flush(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
hcolls = ucolls = 0
new_utxos = len(self.cache)
for cache_key, cache_value in self.cache.items():
# Frist write to the hash168 lookup table
key = b'h' + cache_key[:ADDR_TX_HASH_LEN] + cache_key[-2:]
value = cache_value[:25]
prior_value = self.cache_get(key)
if prior_value: # Should rarely happen
hcolls += 1
value += prior_value
self.cache_write(key, value)
# Next write the UTXO table
key = (b'u' + cache_value[:21] + cache_key[:UTXO_TX_HASH_LEN]
+ cache_key[-2:])
value = cache_value[-12:]
prior_value = self.cache_get(key)
if prior_value: # Should almost never happen
ucolls += 1
value += prior_value
self.cache_write(key, value)
# GC-ing this now can only help the levelDB write.
self.cache = {}
# Now we can update to the batch.
for key, value in self.db_cache.items():
if value:
batch.put(key, value)
else:
batch.delete(key)
self.db_cache = {}
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
.format(self.adds, self.cache_hits))
self.logger.info('UTXO DB adds: {:,d} spends: {:,d}. '
'Collisions: hash168: {:,d} UTXO: {:,d}'
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.adds = self.cache_hits = self.db_deletes = 0
class DB(object):
class Error(Exception):
pass
def __init__(self, env):
self.logger = logging.getLogger('DB')
self.logger.setLevel(logging.INFO)
# Meta
self.tx_hash_file_size = 16 * 1024 * 1024
self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.last_flush = time.time()
self.coin = env.coin
# Chain state (initialize to genesis in case of new DB)
self.db_height = -1
self.db_tx_count = 0
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.tip = self.coin.GENESIS_HASH
# Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin)
self.tx_count = self.fs_tx_count = self.db_tx_count
self.height = self.fs_height = self.db_height
# Caches to be flushed later. Headers and tx_hashes have one
# entry per block
self.headers = []
self.tx_hashes = []
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} utxo flush count: {:,d} '
'sync time: {}'
.format(self.coin.NAME, self.coin.NET, self.height,
self.tx_count, self.flush_count,
self.utxo_flush_count,
formatted_time(self.wall_time)))
self.logger.info('flushing UTXO cache at {:,d} MB'
.format(self.utxo_MB))
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
def open_db(self, coin):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
is_new = False
try:
db = plyvel.DB(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except:
db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
is_new = True
if is_new:
self.logger.info('created new database {}'.format(db_name))
self.flush_state(db)
else:
self.logger.info('successfully opened database {}'.format(db_name))
self.read_state(db)
self.delete_excess_history(db)
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
return db
def read_state(self, db):
state = db.get(b'state')
state = ast.literal_eval(state.decode('ascii'))
if state['genesis'] != self.coin.GENESIS_HASH:
raise self.Error('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
def delete_excess_history(self, db):
'''Clear history flushed since the most recent UTXO flush.'''
utxo_flush_count = self.utxo_flush_count
diff = self.flush_count - utxo_flush_count
if diff == 0:
return
if diff < 0:
raise self.Error('DB corrupt: flush_count < utxo_flush_count')
self.logger.info('DB not shut down cleanly. Scanning for most '
'recent {:,d} history flushes'.format(diff))
prefix = b'H'
unpack = struct.unpack
keys = []
for key, hist in db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
with db.write_batch(transaction=True) as batch:
for key in keys:
db.delete(key)
self.utxo_flush_count = self.flush_count
self.flush_state(batch)
self.logger.info('deletion complete')
def flush_to_fs(self):
'''Flush the things stored on the filesystem.'''
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
self.headers = []
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:
self.height + 1])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert self.tx_hash_file_size % 32 == 0
cursor = 0
file_pos = self.fs_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
self.tx_hashes = []
self.fs_height = self.height
self.fs_tx_count = self.tx_count
os.sync()
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.tip,
'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
}
batch.put(b'state', repr(state).encode('ascii'))
def flush_utxos(self, batch):
self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks'
.format(self.tx_count - self.db_tx_count,
self.height - self.db_height))
self.utxo_cache.flush(batch)
self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
def flush(self, daemon_height, flush_utxos=False):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.fs_tx_count
# Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to
# have updated the DB.
self.logger.info('commencing history flush')
self.flush_to_fs()
with self.db.write_batch(transaction=True) as batch:
# History first - fast and frees memory. Flush state last
# as it reads the wall time.
self.flush_history(batch)
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
self.logger.info('committing transaction...')
# Update and put the wall time again - otherwise we drop the
# time it took leveldb to commit the batch
self.flush_state(self.db)
flush_time = int(self.last_flush - flush_start)
self.logger.info('flush #{:,d} to height {:,d} took {:,d}s'
.format(self.flush_count, self.height, flush_time))
# Log handy stats
txs_per_sec = int(self.tx_count / self.wall_time)
this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
else:
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
* self.coin.TX_PER_BLOCK
+ (self.coin.TX_COUNT - self.tx_count))
self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, '
'since last flush: {:,d}'
.format(self.tx_count, txs_per_sec, this_txs_per_sec))
self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time),
formatted_time(tx_est / this_txs_per_sec)))
def flush_history(self, batch):
# Drop any None entry
self.history.pop(None, None)
self.flush_count += 1
flush_id = struct.pack('>H', self.flush_count)
for hash168, hist in self.history.items():
key = b'H' + hash168 + flush_id
batch.put(key, hist.tobytes())
self.logger.info('{:,d} history entries in {:,d} addrs'
.format(self.history_size, len(self.history)))
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def read_headers(self, height, count):
header_len = self.coin.HEADER_LEN
self.headers_file.seek(height * header_len)
return self.headers_file.read(count * header_len)
def cache_sizes(self, daemon_height):
'''Returns the approximate size of the cache, in MB.'''
# Good average estimates based on traversal of subobjects and
# requesting size from Python (see deep_getsizeof). For
# whatever reason Python O/S mem usage is typically +30% or
# more, so we scale our already bloated object sizes.
one_MB = int(1048576 / 1.3)
utxo_cache_size = len(self.utxo_cache.cache) * 187
db_cache_size = len(self.utxo_cache.db_cache) * 105
hist_cache_size = len(self.history) * 180 + self.history_size * 4
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB
hist_MB = hist_cache_size // one_MB
self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
.format(self.height, daemon_height))
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
'hist addrs: {:,d} hist size: {:,d}'
.format(len(self.utxo_cache.cache),
len(self.utxo_cache.db_cache),
len(self.history),
self.history_size))
self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)'
.format(utxo_MB + hist_MB, utxo_MB, hist_MB))
return utxo_MB, hist_MB
def process_block(self, block, daemon_height):
self.headers.append(block[:self.coin.HEADER_LEN])
tx_hashes, txs = self.coin.read_block(block)
self.height += 1
assert len(self.tx_counts) == self.height
# These both need to be updated before calling process_tx().
# It uses them for tx hash lookup
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(self.tx_count + len(txs))
for tx_hash, tx in zip(tx_hashes, txs):
self.process_tx(tx_hash, tx)
# Check if we're getting full and time to flush?
now = time.time()
if now > self.next_cache_check:
self.next_cache_check = now + 60
utxo_MB, hist_MB = self.cache_sizes(daemon_height)
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(daemon_height, utxo_MB >= self.utxo_MB)
def process_tx(self, tx_hash, tx):
cache = self.utxo_cache
tx_num = self.tx_count
# Add the outputs as new UTXOs; spend the inputs
hash168s = cache.add_many(tx_hash, tx_num, tx.outputs)
if not tx.is_coinbase:
for txin in tx.inputs:
hash168s.add(cache.spend(txin.prevout))
for hash168 in hash168s:
self.history[hash168].append(tx_num)
self.history_size += len(hash168s)
self.tx_count += 1
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.fs_height:
tx_hashes = self.tx_hashes[height - (self.fs_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
@staticmethod
def resolve_limit(limit):
if limit is None:
return -1
assert isinstance(limit, int) and limit >= 0
return limit
def get_history(self, hash168, limit=1000):
'''Generator that returns an unpruned, sorted list of (tx_hash,
height) tuples of transactions that touched the address,
earliest in the blockchain first. Includes both spending and
receiving transactions. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
prefix = b'H' + hash168
for key, hist in self.db.iterator(prefix=prefix):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
if limit == 0:
return
yield self.get_tx_hash(tx_num)
limit -= 1
def get_balance(self, hash168):
'''Returns the confirmed balance of an address.'''
return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None))
def get_utxos(self, hash168, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
'''
limit = self.resolve_limit(limit)
unpack = struct.unpack
prefix = b'u' + hash168
utxos = []
for k, v in self.db.iterator(prefix=prefix):
(tx_pos, ) = unpack('<H', k[-2:])
for n in range(0, len(v), 12):
if limit == 0:
return
(tx_num, ) = unpack('<I', v[n:n+4])
(value, ) = unpack('<Q', v[n+4:n+12])
tx_hash, height = self.get_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)
limit -= 1
def get_utxos_sorted(self, hash168):
'''Returns all the UTXOs for an address sorted by height and
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))

39
server/env.py

@ -1,28 +1,37 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import logging
from os import environ
from lib.coins import Coin
from lib.util import LoggedClass
class Env(object):
class Env(LoggedClass):
'''Wraps environment configuration.'''
class Error(Exception):
pass
def __init__(self):
self.logger = logging.getLogger('Env')
self.logger.setLevel(logging.INFO)
super().__init__()
coin_name = self.default('COIN', 'Bitcoin')
network = self.default('NETWORK', 'mainnet')
self.coin = Coin.lookup_coin_class(coin_name, network)
self.db_dir = self.required('DB_DIRECTORY')
self.utxo_MB = self.integer('UTXO_MB', 1000)
self.hist_MB = self.integer('HIST_MB', 250)
self.rpc_url = self.build_rpc_url()
self.host = self.default('HOST', 'localhost')
self.reorg_limit = self.integer('REORG_LIMIT', 200)
self.daemon_url = self.build_daemon_url()
# Server stuff
self.tcp_port = self.integer('TCP_PORT', None)
self.ssl_port = self.integer('SSL_PORT', None)
self.rpc_port = self.integer('RPC_PORT', 8000)
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None)
# The electrum client takes the empty string as unspecified
self.donation_address = self.default('DONATION_ADDRESS', '')
def default(self, envvar, default):
return environ.get(envvar, default)
@ -43,13 +52,13 @@ class Env(object):
raise self.Error('cannot convert envvar {} value {} to an integer'
.format(envvar, value))
def build_rpc_url(self):
rpc_url = environ.get('RPC_URL')
if not rpc_url:
rpc_username = self.required('RPC_USERNAME')
rpc_password = self.required('RPC_PASSWORD')
rpc_host = self.required('RPC_HOST')
rpc_port = self.default('RPC_PORT', self.coin.DEFAULT_RPC_PORT)
rpc_url = ('http://{}:{}@{}:{}/'
.format(rpc_username, rpc_password, rpc_host, rpc_port))
return rpc_url
def build_daemon_url(self):
daemon_url = environ.get('DAEMON_URL')
if not daemon_url:
username = self.required('DAEMON_USERNAME')
password = self.required('DAEMON_PASSWORD')
host = self.required('DAEMON_HOST')
port = self.default('DAEMON_PORT', self.coin.DEFAULT_RPC_PORT)
daemon_url = ('http://{}:{}@{}:{}/'
.format(username, password, host, port))
return daemon_url

216
server/protocol.py

@ -0,0 +1,216 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import asyncio
import codecs
import json
import traceback
from functools import partial
from lib.util import LoggedClass
from server.version import VERSION
class Error(Exception):
BAD_REQUEST = 1
INTERNAL_ERROR = 2
class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self, controller):
super().__init__()
self.controller = controller
self.parts = []
def connection_made(self, transport):
self.transport = transport
peername = transport.get_extra_info('peername')
self.logger.info('connection from {}'.format(peername))
self.controller.add_session(self)
def connection_lost(self, exc):
self.logger.info('disconnected')
self.controller.remove_session(self)
def data_received(self, data):
while True:
npos = data.find(ord('\n'))
if npos == -1:
break
tail, data = data[:npos], data[npos + 1:]
parts = self.parts
self.parts = []
parts.append(tail)
self.decode_message(b''.join(parts))
if data:
self.parts.append(data)
def decode_message(self, message):
'''Message is a binary message.'''
try:
message = json.loads(message.decode())
except Exception as e:
self.logger.info('caught exception decoding message'.format(e))
return
job = self.request_handler(message)
self.controller.add_job(job)
async def request_handler(self, request):
'''Called asynchronously.'''
error = result = None
try:
result = await self.json_handler(request)
except Error as e:
error = {'code': e.args[0], 'message': e.args[1]}
except asyncio.CancelledError:
raise
except Exception as e:
# This should be considered a bug and fixed
traceback.print_exc()
error = {'code': Error.INTERNAL_ERROR, 'message': str(e)}
payload = {'id': request.get('id'), 'error': error, 'result': result}
try:
data = json.dumps(payload) + '\n'
except TypeError:
msg = 'cannot JSON encode response to request {}'.format(request)
self.logger.error(msg)
error = {'code': Error.INTERNAL_ERROR, 'message': msg}
payload = {'id': request.get('id'), 'error': error, 'result': None}
data = json.dumps(payload) + '\n'
self.transport.write(data.encode())
async def json_handler(self, request):
method = request.get('method')
handler = None
if isinstance(method, str):
handler_name = 'handle_{}'.format(method.replace('.', '_'))
handler = getattr(self, handler_name, None)
if not handler:
self.logger.info('unknown method: {}'.format(method))
raise Error(Error.BAD_REQUEST, 'unknown method: {}'.format(method))
params = request.get('params', [])
if not isinstance(params, list):
raise Error(Error.BAD_REQUEST, 'params should be an array')
return await handler(params)
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
def __init__(self, controller, daemon, env):
super().__init__(controller)
self.daemon = daemon
self.env = env
self.addresses = set()
self.subscribe_headers = False
def params_to_hash168(self, params):
if len(params) != 1:
raise Error(Error.BAD_REQUEST,
'params should contain a single address')
address = params[0]
try:
return self.env.coin.address_to_hash168(address)
except:
raise Error(Error.BAD_REQUEST,
'invalid address: {}'.format(address))
async def handle_blockchain_address_get_history(self, params):
hash168 = self.params_to_hash168(params)
return self.controller.get_history(hash168)
async def handle_blockchain_address_subscribe(self, params):
hash168 = self.params_to_hash168(params)
status = self.controller.address_status(hash168)
return status.hex() if status else None
async def handle_blockchain_estimatefee(self, params):
result = await self.daemon.send_single('estimatefee', params)
return result
async def handle_blockchain_headers_subscribe(self, params):
self.subscribe_headers = True
return self.controller.get_current_header()
async def handle_blockchain_relayfee(self, params):
'''The minimum fee a low-priority tx must pay in order to be accepted
to this daemon's memory pool.
'''
net_info = await self.daemon.send_single('getnetworkinfo')
return net_info['relayfee']
async def handle_blockchain_transaction_get(self, params):
if len(params) != 1:
raise Error(Error.BAD_REQUEST,
'params should contain a transaction hash')
tx_hash = params[0]
return await self.daemon.send_single('getrawtransaction', (tx_hash, 0))
async def handle_blockchain_transaction_get_merkle(self, params):
if len(params) != 2:
raise Error(Error.BAD_REQUEST,
'params should contain a transaction hash and height')
tx_hash, height = params
return await self.controller.get_merkle(tx_hash, height)
async def handle_server_banner(self, params):
'''Return the server banner.'''
banner = 'Welcome to Electrum!'
if self.env.banner_file:
try:
with codecs.open(self.env.banner_file, 'r', 'utf-8') as f:
banner = f.read()
except Exception as e:
self.logger.error('reading banner file {}: {}'
.format(self.env.banner_file, e))
return banner
async def handle_server_donation_address(self, params):
'''Return the donation address as a string.
If none is specified return the empty string.
'''
return self.env.donation_address
async def handle_server_peers_subscribe(self, params):
'''Returns the peer (ip, host, ports) tuples.
Despite the name electrum-server does not treat this as a
subscription.
'''
peers = self.controller.get_peers()
return tuple(peers.values())
async def handle_server_version(self, params):
'''Return the server version as a string.'''
return VERSION
class LocalRPC(JSONRPC):
'''A local TCP RPC server for querying status.'''
async def handle_getinfo(self, params):
return {
'blocks': self.controller.height(),
'peers': len(self.controller.get_peers()),
'sessions': len(self.controller.sessions),
'watched': sum(len(s.addresses) for s in self.controller.sessions
if isinstance(s, ElectrumX)),
'cached': 0,
}
async def handle_sessions(self, params):
return []
async def handle_numsessions(self, params):
return len(self.controller.sessions)
async def handle_peers(self, params):
return tuple(self.controller.get_peers().keys())
async def handle_numpeers(self, params):
return len(self.controller.get_peers())

174
server/server.py

@ -1,174 +0,0 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import asyncio
import json
import logging
import os
import signal
import time
from functools import partial
import aiohttp
from server.db import DB
class Server(object):
def __init__(self, env):
self.env = env
self.db = DB(env)
self.block_cache = BlockCache(env, self.db)
self.tasks = [
asyncio.ensure_future(self.block_cache.catch_up()),
asyncio.ensure_future(self.block_cache.process_cache()),
]
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def on_signal(self, signame):
logging.warning('received {} signal, preparing to shut down'
.format(signame))
for task in self.tasks:
task.cancel()
def async_tasks(self):
return self.tasks
class BlockCache(object):
'''Requests blocks ahead of time from the daemon. Serves them
to the blockchain processor.'''
def __init__(self, env, db):
self.logger = logging.getLogger('BlockCache')
self.logger.setLevel(logging.INFO)
self.db = db
self.rpc_url = env.rpc_url
# Cache target size is in MB. Has little effect on sync time.
self.cache_limit = 10
self.daemon_height = 0
self.fetched_height = db.height
# Blocks stored in reverse order. Next block is at end of list.
self.blocks = []
self.recent_sizes = []
self.ave_size = 0
self.logger.info('using RPC URL {}'.format(self.rpc_url))
async def process_cache(self):
while True:
await asyncio.sleep(1)
while self.blocks:
self.db.process_block(self.blocks.pop(), self.daemon_height)
# Release asynchronous block fetching
await asyncio.sleep(0)
async def catch_up(self):
self.logger.info('catching up, block cache limit {:d}MB...'
.format(self.cache_limit))
try:
while await self.maybe_prefill():
await asyncio.sleep(1)
self.logger.info('caught up to height {:d}'
.format(self.daemon_height))
finally:
self.db.flush(self.daemon_height, True)
def cache_used(self):
return sum(len(block) for block in self.blocks)
def prefill_count(self, room):
count = 0
if self.ave_size:
count = room // self.ave_size
return max(count, 10)
async def maybe_prefill(self):
'''Returns False to stop. True to sleep a while for asynchronous
processing.'''
cache_limit = self.cache_limit * 1024 * 1024
while True:
cache_used = self.cache_used()
if cache_used > cache_limit:
return True
# Keep going by getting a whole new cache_limit of blocks
self.daemon_height = await self.send_single('getblockcount')
max_count = min(self.daemon_height - self.fetched_height, 4000)
count = min(max_count, self.prefill_count(cache_limit))
if not count:
return False # Done catching up
first = self.fetched_height + 1
param_lists = [[height] for height in range(first, first + count)]
hashes = await self.send_vector('getblockhash', param_lists)
# Hashes is an array of hex strings
param_lists = [(h, False) for h in hashes]
blocks = await self.send_vector('getblock', param_lists)
self.fetched_height += count
# Convert hex string to bytes and put in memoryview
blocks = [bytes.fromhex(block) for block in blocks]
# Reverse order and place at front of list
self.blocks = list(reversed(blocks)) + self.blocks
# Keep 50 most recent block sizes for fetch count estimation
sizes = [len(block) for block in blocks]
self.recent_sizes.extend(sizes)
excess = len(self.recent_sizes) - 50
if excess > 0:
self.recent_sizes = self.recent_sizes[excess:]
self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
async def send_single(self, method, params=None):
payload = {'method': method}
if params:
payload['params'] = params
result, = await self.send((payload, ))
return result
async def send_many(self, mp_pairs):
payload = [{'method': method, 'params': params}
for method, params in mp_pairs]
return await self.send(payload)
async def send_vector(self, method, params_list):
payload = [{'method': method, 'params': params}
for params in params_list]
return await self.send(payload)
async def send(self, payload):
assert isinstance(payload, (tuple, list))
data = json.dumps(payload)
while True:
try:
async with aiohttp.request('POST', self.rpc_url,
data = data) as resp:
result = await resp.json()
except asyncio.CancelledError:
raise
except Exception as e:
msg = 'aiohttp error: {}'.format(e)
secs = 3
else:
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == -28 for err in errs):
msg = 'daemon still warming up.'
secs = 30
else:
msg = 'daemon errors: {}'.format(errs)
secs = 3
self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs))
await asyncio.sleep(secs)

1
server/version.py

@ -0,0 +1 @@
VERSION = "ElectrumX 0.02"

12
server_main.py

@ -9,7 +9,7 @@ import os
import traceback
from server.env import Env
from server.server import Server
from server.controller import Controller
def main_loop():
@ -22,15 +22,19 @@ def main_loop():
logging.info('switching current directory to {}'.format(env.db_dir))
os.chdir(env.db_dir)
server = Server(env)
tasks = server.async_tasks()
loop = asyncio.get_event_loop()
#loop.set_debug(True)
controller = Controller(loop, env)
controller.start()
tasks = asyncio.Task.all_tasks(loop)
try:
loop.run_until_complete(asyncio.gather(*tasks))
except asyncio.CancelledError:
logging.warning('task cancelled; asyncio event loop closing')
finally:
controller.stop()
loop.close()

Loading…
Cancel
Save