Browse Source

Merge branch 'release-0.01'

master
Neil Booth 8 years ago
parent
commit
3872f0f4cf
  1. 130
      HOWTO.rst
  2. 35
      README.rst
  3. 4
      lib/coins.py
  4. 43
      lib/util.py
  5. 2
      query.py
  6. 27
      samples/scripts/NOTES
  7. 1
      samples/scripts/env/CACHE_MB
  8. 1
      samples/scripts/env/FLUSH_SIZE
  9. 1
      samples/scripts/env/HIST_MB
  10. 764
      server/db.py
  11. 3
      server/env.py
  12. 177
      server/server.py
  13. 2
      server_main.py

130
HOWTO.rst

@ -107,8 +107,8 @@ You can see its logs with::
tail -F /path/to/log/dir/current | tai64nlocal
Progress
========
Sync Progress
=============
Speed indexing the blockchain depends on your hardware of course. As
Python is single-threaded most of the time only 1 core is kept busy.
@ -120,30 +120,20 @@ may even be beneficial to have the daemon on a separate machine so the
machine doing the indexing is focussing on the one task and not the
wider network.
The FLUSH_SIZE environment variable is an upper bound on how much
unflushed data is cached before writing to disk + leveldb. The
default is 4 million items, which is probably fine unless your
hardware is quite poor. If you've got a really fat machine with lots
of RAM, 10 million or even higher is likely good (I used 10 million on
Machine B below without issue so far). A higher number will have
fewer flushes and save your disk thrashing, but you don't want it so
high your machine is swapping. If your machine loses power all
synchronization since the previous flush is lost.
When syncing, ElectrumX is CPU bound over 70% of the time, with the
rest being bursts of disk activity whilst flushing. Here is my
experience with the current codebase, to given heights and rough
wall-time::
The HIST_MB and CACHE_MB environment variables control cache sizes
before they spill to disk; see the NOTES file under samples/scripts.
Here is my experience with the current codebase, to given heights and
rough wall-time::
Machine A Machine B DB + Metadata
100,000 2m 30s 0 (unflushed)
150,000 35m 4m 30s 0.2 GiB
180,000 1h 5m 9m 0.4 GiB
245,800 3h 1h 30m 2.7 GiB
290,000 13h 15m 3h 5m 3.3 GiB
307,000 17h 16m 3h 50m 4.1 GiB
343,000 6h 54m 6.0 GiB
386,600 17h 07m 7.0 GiB
180,000 7m 10s 0.4 GiB
245,800 1h 00m 2.7 GiB
290,000 1h 56m 3.3 GiB
343,000 3h 56m 6.0 GiB
386,000 7h 28m 7.0 GiB
404,000 9h 41m
434,369 14h 38m 17.1 GiB
Machine A: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, 8GB
RAM and a DragonFlyBSD HAMMER fileystem on an SSD. It requests blocks
@ -157,10 +147,6 @@ quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on
the same machine. FLUSH_SIZE of 10 million. First flush at height
195,146.
Transactions processed per second seems to gradually decrease over
time but this statistic is not currently logged and I've not looked
closely.
For chains other than bitcoin-mainnet sychronization should be much
faster.
@ -184,19 +170,11 @@ with the intent that, to the extent this atomicity guarantee holds,
the database should not get corrupted even if the ElectrumX process if
forcibly killed or there is loss of power. The worst case is losing
unflushed in-memory blockchain processing and having to restart from
the state as of the prior successfully completed flush.
During development I have terminated ElectrumX processes in various
ways and at random times, and not once have I had any corruption as a
result of doing so. Mmy only DB corruption has been through buggy
code. If you do have any database corruption as a result of
terminating the process without modifying the code I would be very
interested in hearing details.
the state as of the prior successfully completed UTXO flush.
I have heard about corruption issues with electrum-server. I cannot
be sure but with a brief look at the code it does seem that if
interrupted at the wrong time the databases it uses could become
inconsistent.
If you do have any database corruption as a result of terminating the
process (without modifying the code) I would be interested in the
details.
Once the process has terminated, you can start it up again with::
@ -211,7 +189,6 @@ same service directory, such as a testnet or altcoin server. See the
man pages of these various commands for more information.
Understanding the Logs
======================
@ -221,49 +198,38 @@ You can see the logs usefully like so::
Here is typical log output on startup::
2016-10-08 14:46:48.088516500 Launching ElectrumX server...
2016-10-08 14:46:49.145281500 INFO:root:ElectrumX server starting
2016-10-08 14:46:49.147215500 INFO:root:switching current directory to /var/nohist/server-test
2016-10-08 14:46:49.150765500 INFO:DB:using flush size of 1,000,000 entries
2016-10-08 14:46:49.156489500 INFO:DB:created new database Bitcoin-mainnet
2016-10-08 14:46:49.157531500 INFO:DB:flushing to levelDB 0 txs and 0 blocks to height -1 tx count: 0
2016-10-08 14:46:49.158640500 INFO:DB:flushed. Cache hits: 0/0 writes: 5 deletes: 0 elided: 0 sync: 0d 00h 00m 00s
2016-10-08 14:46:49.159508500 INFO:RPC:using RPC URL http://user:pass@192.168.0.2:8332/
2016-10-08 14:46:49.167352500 INFO:BlockCache:catching up, block cache limit 10MB...
2016-10-08 14:46:49.318374500 INFO:BlockCache:prefilled 10 blocks to height 10 daemon height: 433,401 block cache size: 2,150
2016-10-08 14:46:50.193962500 INFO:BlockCache:prefilled 4,000 blocks to height 4,010 daemon height: 433,401 block cache size: 900,043
2016-10-08 14:46:51.253644500 INFO:BlockCache:prefilled 4,000 blocks to height 8,010 daemon height: 433,401 block cache size: 1,600,613
2016-10-08 14:46:52.195633500 INFO:BlockCache:prefilled 4,000 blocks to height 12,010 daemon height: 433,401 block cache size: 2,329,325
Under normal operation these prefill messages repeat fairly regularly.
Occasionally (depending on how big your FLUSH_SIZE environment
variable was set, and your hardware, this could be anything from every
5 minutes to every hour) you will get a flush to disk that begins with:
2016-10-08 06:34:20.841563500 INFO:DB:flushing to levelDB 828,190 txs and 3,067 blocks to height 243,982 tx count: 20,119,669
During the flush, which can take many minutes, you may see logs like
this:
2016-10-08 12:20:08.558750500 INFO:DB:address 1dice7W2AicHosf5EL3GFDUVga7TgtPFn hist moving to idx 3000
These are just informational messages about addresses that have very
large histories that are generated as those histories are being
written out. After the flush has completed a few stats are printed
about cache hits, the number of writes and deletes, and the number of
writes that were elided by the cache::
2016-10-08 06:37:41.035139500 INFO:DB:flushed. Cache hits: 3,185,958/192,336 writes: 781,526 deletes: 465,236 elided: 3,185,958 sync: 0d 06h 57m 03s
2016-10-14 20:22:10.747808500 Launching ElectrumX server...
2016-10-14 20:22:13.032415500 INFO:root:ElectrumX server starting
2016-10-14 20:22:13.032633500 INFO:root:switching current directory to /Users/neil/server-btc
2016-10-14 20:22:13.038495500 INFO:DB:created new database Bitcoin-mainnet
2016-10-14 20:22:13.038892500 INFO:DB:Bitcoin/mainnet height: -1 tx count: 0 flush count: 0 utxo flush count: 0 sync time: 0d 00h 00m 00s
2016-10-14 20:22:13.038935500 INFO:DB:flushing all after cache reaches 2,000 MB
2016-10-14 20:22:13.038978500 INFO:DB:flushing history cache at 400 MB
2016-10-14 20:22:13.039076500 INFO:BlockCache:using RPC URL http://user:password@192.168.0.2:8332/
2016-10-14 20:22:13.039796500 INFO:BlockCache:catching up, block cache limit 10MB...
2016-10-14 20:22:14.092192500 INFO:DB:cache stats at height 0 daemon height: 434,293
2016-10-14 20:22:14.092243500 INFO:DB: entries: UTXO: 1 DB: 0 hist count: 1 hist size: 1
2016-10-14 20:22:14.092288500 INFO:DB: size: 0MB (UTXOs 0MB hist 0MB)
2016-10-14 20:22:32.302394500 INFO:UTXO:duplicate tx hash d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599
2016-10-14 20:22:32.310441500 INFO:UTXO:duplicate tx hash e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468
2016-10-14 20:23:14.094855500 INFO:DB:cache stats at height 125,278 daemon height: 434,293
2016-10-14 20:23:14.095026500 INFO:DB: entries: UTXO: 191,155 DB: 0 hist count: 543,455 hist size: 1,394,187
2016-10-14 20:23:14.095028500 INFO:DB: size: 172MB (UTXOs 44MB hist 128MB)
Under normal operation these cache stats repeat roughly every minute.
Flushes can take many minutes and look like this::
2016-10-14 21:30:29.085479500 INFO:DB:flushing UTXOs: 22,910,848 txs and 254,753 blocks
2016-10-14 21:32:05.383413500 INFO:UTXO:UTXO cache adds: 55,647,862 spends: 48,751,219
2016-10-14 21:32:05.383460500 INFO:UTXO:UTXO DB adds: 6,875,315 spends: 0. Collisions: hash168: 268 UTXO: 0
2016-10-14 21:32:07.056008500 INFO:DB:6,982,386 history entries in 1,708,991 addrs
2016-10-14 21:32:08.169468500 INFO:DB:committing transaction...
2016-10-14 21:33:17.644296500 INFO:DB:flush #11 to height 254,752 took 168s
2016-10-14 21:33:17.644357500 INFO:DB:txs: 22,910,848 tx/sec since genesis: 5,372, since last flush: 3,447
2016-10-14 21:33:17.644536500 INFO:DB:sync time: 0d 01h 11m 04s ETA: 0d 11h 22m 42s
After flush-to-disk you may see an aiohttp error; this is the daemon
timing out the connection while the disk flush was in progress. This
is harmless; I intend to fix this soon by yielding whilst flushing.
You may see one or two logs about UTXOs or hash160 key collisions::
2016-10-08 07:24:34.068609500 INFO:DB:UTXO compressed key collision at height 252943 utxo 115cc1408e5321636675a8fcecd204661a6f27b4b7482b1b7c4402ca4b94b72f / 1
These are informational messages about an artefact of the compression
scheme ElectrumX uses and are harmless. However, if you see more than
a handful of these, particularly close together, something is very
wrong and your DB is probably corrupt.
The ETA is just a guide and can be quite volatile.

35
README.rst

@ -41,11 +41,12 @@ Implementation
ElectrumX does not currently do any pruning. With luck it may never
become necessary. So how does it achieve a much more compact database
than Electrum server, which throws away a lot of information? And
sync faster to boot?
than Electrum server, which prunes a lot of hisory, and also sync
faster?
All of the following likely play a part:
- aggressive caching and batching of DB writes
- more compact representation of UTXOs, the mp address index, and
history. Electrum server stores full transaction hash and height
for all UTXOs. In its pruned history it does the same. ElectrumX
@ -58,32 +59,22 @@ All of the following likely play a part:
disk rather than in levelDB. It would be nice to do this for histories
but I cannot think how they could be easily indexable on a filesystem.
- avoiding unnecessary or redundant computations
- more efficient memory usage - through more compact data structures and
and judicious use of memoryviews
- big caches (controlled via FLUSH_SIZE)
- more efficient memory usage
- asyncio and asynchronous prefetch of blocks. With luck ElectrumX
will have no need of threads or locking primitives
- because it prunes electrum-server needs to store undo information,
ElectrumX should does not need to store undo information for
blockchain reorganisations (note blockchain reorgs are not yet
implemented in ElectrumX)
- finally electrum-server maintains a patricia tree of UTXOs. My
understanding is this is for future features and not currently
required. It's unclear precisely how this will be used or what
could replace or duplicate its functionality in ElectrumX. Since
ElectrumX stores all necessary blockchain metadata some solution
should exist.
Future/TODO
===========
- handling blockchain reorgs
- handling client connections (heh!)
- investigating leveldb space / speed tradeoffs
- seeking out further efficiencies. ElectrumX is CPU bound; it would not
surprise me if there is a way to cut CPU load by 10-20% more. To squeeze
even more out would probably require some things to move to C or C++.
Roadmap
=======
- test a few more performance improvement ideas
- handle blockchain reorgs
- handle client connections
- potentially move some functionality to C or C++
Once I get round to writing the server part, I will add DoS
protections if necessary to defend against requests for large

4
lib/coins.py

@ -137,7 +137,6 @@ class Coin(object):
@classmethod
def read_block(cls, block):
assert isinstance(block, memoryview)
d = Deserializer(block[cls.HEADER_LEN:])
return d.read_block()
@ -157,6 +156,9 @@ class Bitcoin(Coin):
WIF_BYTE = 0x80
GENESIS_HASH=(b'000000000019d6689c085ae165831e93'
b'4ff763ae46a2a6c172b3f1b60a8ce26f')
TX_COUNT = 142791895
TX_COUNT_HEIGHT = 420976
TX_PER_BLOCK = 1600
class BitcoinTestnet(Coin):
NAME = "Bitcoin"

43
lib/util.py

@ -1,8 +1,9 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import array
import sys
from collections import Container, Mapping
# Method decorator. To be used for calculations that will always
@ -25,6 +26,46 @@ class cachedproperty(object):
.format(self.f.__name__, obj))
def deep_getsizeof(obj):
"""Find the memory footprint of a Python object.
Based on from code.tutsplus.com: http://goo.gl/fZ0DXK
This is a recursive function that drills down a Python object graph
like a dictionary holding nested dictionaries with lists of lists
and tuples and sets.
The sys.getsizeof function does a shallow size of only. It counts each
object inside a container as pointer only regardless of how big it
really is.
:param o: the object
:return:
"""
ids = set()
def size(o):
if id(o) in ids:
return 0
r = sys.getsizeof(o)
ids.add(id(o))
if isinstance(o, (str, bytes, bytearray, array.array)):
return r
if isinstance(o, Mapping):
return r + sum(size(k) + size(v) for k, v in o.items())
if isinstance(o, Container):
return r + sum(size(x) for x in o)
return r
return size(obj)
def chunks(items, size):
for i in range(0, len(items), size):
yield items[i: i + size]

2
query.py

@ -33,7 +33,7 @@ def main():
n = None
for n, utxo in enumerate(db.get_utxos(hash168, limit)):
print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}'
.format(n, bytes(reversed(utxo.tx_hash)).hex(),
.format(n + 1, bytes(reversed(utxo.tx_hash)).hex(),
utxo.tx_pos, utxo.height, utxo.value))
if n is None:
print('No UTXOs')

27
samples/scripts/NOTES

@ -11,5 +11,28 @@ connecting to the daemon, or you must specify RPC_HOST, RPC_USER,
RPC_PASSWORD and optionally RPC_PORT (it defaults appropriately for
the coin and network otherwise).
The other environment variables are all optional and will adopt sensible defaults if not
specified.
The other environment variables are all optional and will adopt
sensible defaults if not specified.
Your performance might change by tweaking these cache settings. Cache
size is only checked roughly every minute, so the caches can grow
beyond the specified size. Also the Python process is often quite a
bit bigger than the combine cache size, because of Python overhead and
also because leveldb can consume quite a lot of memory during UTXO
flushing. So these are rough numbers only:
HIST_MB - amount of history cache, in MB, to retain before flushing to
disk. Default is 250; probably no benefit being much larger
as history is append-only and not searched.
CACHE_MB- amount of UTXO and history cache, in MB, to retain before
flushing to disk. Default is 1000. This may be too large
for small boxes or too small for machines with lots of RAM.
Larger caches generally perform better as there is
significant searching of the UTXO cache during indexing.
However, I don't see much benefit in my tests pushing this
beyond 2000, and in fact beyond there performance begins to
fall. My machine has 24GB RAM; the slow down is probably
because of leveldb caching and Python GC effects. However
this may be very dependent on hardware and you may have
different results.

1
samples/scripts/env/CACHE_MB

@ -0,0 +1 @@
1000

1
samples/scripts/env/FLUSH_SIZE

@ -1 +0,0 @@
4000000

1
samples/scripts/env/HIST_MB

@ -0,0 +1 @@
250

764
server/db.py

@ -2,6 +2,7 @@
# and warranty status of this software.
import array
import ast
import itertools
import os
import struct
@ -25,21 +26,260 @@ UTXO_TX_HASH_LEN = 4
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
def to_4_bytes(value):
return struct.pack('<I', value)
def formatted_time(t):
t = int(t)
return '{:d}d {:02d}h {:02d}m {:02d}s'.format(
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
def from_4_bytes(b):
return struct.unpack('<I', b)[0]
class UTXOCache(object):
'''An in-memory UTXO cache, representing all changes to UTXO state
since the last DB flush.
class DB(object):
We want to store millions, perhaps 10s of millions of these in
memory for optimal performance during initial sync, because then
it is possible to spend UTXOs without ever going to the database
(other than as an entry in the address history, and there is only
one such entry per TX not per UTXO). So store them in a Python
dictionary with binary keys and values.
Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes)
Value: HASH168 + TX_NUM + VALUE (21 + 4 + 8 = 33 bytes)
That's 67 bytes of raw data. Python dictionary overhead means
each entry actually uses about 187 bytes of memory. So almost
11.5 million UTXOs can fit in 2GB of RAM. There are approximately
42 million UTXOs on bitcoin mainnet at height 433,000.
Semantics:
add: Add it to the cache dictionary.
spend: Remove it if in the cache dictionary.
Otherwise it's been flushed to the DB. Each UTXO
is responsible for two entries in the DB stored using
compressed keys. Mark both for deletion in the next
flush of the in-memory UTXO cache.
A UTXO is stored in the DB in 2 "tables":
1. The output value and tx number. Must be keyed with a
hash168 prefix so the unspent outputs and balance of an
arbitrary address can be looked up with a simple key
traversal.
Key: b'u' + hash168 + compressed_tx_hash + tx_idx
Value: a (tx_num, value) pair
2. Given a prevout, we need to be able to look up the UTXO key
to remove it. As is keyed by hash168 and that is not part
of the prevout, we need a hash168 lookup.
Key: b'h' + compressed tx_hash + tx_idx
Value: (hash168, tx_num) pair
The compressed TX hash is just the first few bytes of the hash of
the TX the UTXO is in (and needn't be the same number of bytes in
each table). As this is not unique there will be collisions;
tx_num is stored to resolve them. The collision rate is around
0.02% for the hash168 table, and almost zero for the UTXO table
(there are around 100 collisions in the whole bitcoin blockchain).
'''
def __init__(self, parent, db, coin):
self.logger = logging.getLogger('UTXO')
self.logger.setLevel(logging.INFO)
self.parent = parent
self.coin = coin
self.cache = {}
self.db = db
self.db_cache = {}
# Statistics
self.adds = 0
self.cache_hits = 0
self.db_deletes = 0
def add_many(self, tx_hash, tx_num, txouts):
'''Add a sequence of UTXOs to the cache, return the set of hash168s
seen.
Pass the hash of the TX it appears in, its TX number, and the
TX outputs.
'''
parse_script = ScriptPubKey.from_script
pack = struct.pack
tx_numb = pack('<I', tx_num)
hash168s = set()
self.adds += len(txouts)
for idx, txout in enumerate(txouts):
# Get the hash168. Ignore scripts we can't grok.
pk = parse_script(txout.pk_script, self.coin)
hash168 = pk.hash168
if not hash168:
continue
hash168s.add(hash168)
key = tx_hash + pack('<H', idx)
# Well-known duplicate coinbases from heights 91722-91880
# that destoyed 100 BTC forever:
# e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468
# d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599
#if key in self.cache:
# self.logger.info('duplicate tx hash {}'
# .format(bytes(reversed(tx_hash)).hex()))
# b''.join avoids this: https://bugs.python.org/issue13298
self.cache[key] = b''.join(
(hash168, tx_numb, pack('<Q', txout.value)))
return hash168s
def spend(self, prevout):
'''Spend a UTXO and return the address spent.
If the UTXO is not in the cache it must be on disk.
'''
# Fast track is it's in the cache
pack = struct.pack
key = b''.join((prevout.hash, pack('<H', prevout.n)))
value = self.cache.pop(key, None)
if value:
self.cache_hits += 1
return value[:21]
# Oh well. Find and remove it from the DB.
hash168 = self.hash168(prevout.hash, prevout.n)
if not hash168:
return None
self.db_deletes += 1
# Read the UTXO through the cache from the disk. We have to
# go through the cache because compressed keys can collide.
key = (b'u' + hash168 + prevout.hash[:UTXO_TX_HASH_LEN]
+ pack('<H', prevout.n))
data = self.cache_get(key)
if data is None:
# Uh-oh, this should not happen...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(bytes(reversed(prevout.hash)).hex(),
prevout.n, bytes(key).hex()))
return hash168
if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.cache_delete(key)
return hash168
# Resolve the compressed key collison. These should be
# extremely rare.
assert len(data) % 12 == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.parent.get_tx_hash(tx_num)
if prevout.hash == tx_hash:
data = data[:n] + data[n + 12:]
self.cache_write(key, data)
return hash168
HEIGHT_KEY = b'height'
TIP_KEY = b'tip'
GENESIS_KEY = b'genesis'
TX_COUNT_KEY = b'tx_count'
FLUSH_COUNT_KEY = b'flush_count'
WALL_TIME_KEY = b'wall_time'
raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx):
'''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is
indicates a non-standard script).
'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + struct.pack('<H', idx)
data = self.cache_get(key)
if data is None:
# Assuming the DB is not corrupt, this indicates a
# successful spend of a non-standard script
# self.logger.info('ignoring spend of non-standard UTXO {} / {:d}'
# .format(bytes(reversed(tx_hash)).hex(), idx)))
return None
if len(data) == 25:
self.cache_delete(key)
return data[:21]
assert len(data) % 25 == 0
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num)
if my_hash == tx_hash:
self.cache_write(key, data[:n] + data[n+25:])
return data[n:n+21]
raise Exception('could not resolve hash168 collision')
def cache_write(self, key, value):
'''Cache write of a (key, value) pair to the DB.'''
assert(bool(value))
self.db_cache[key] = value
def cache_delete(self, key):
'''Cache deletion of a key from the DB.'''
self.db_cache[key] = None
def cache_get(self, key):
'''Fetch a value from the DB through our write cache.'''
value = self.db_cache.get(key)
if value:
return value
return self.db.get(key)
def flush(self, batch):
'''Flush the cached DB writes and UTXO set to the batch.'''
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
hcolls = ucolls = 0
new_utxos = len(self.cache)
for cache_key, cache_value in self.cache.items():
# Frist write to the hash168 lookup table
key = b'h' + cache_key[:ADDR_TX_HASH_LEN] + cache_key[-2:]
value = cache_value[:25]
prior_value = self.cache_get(key)
if prior_value: # Should rarely happen
hcolls += 1
value += prior_value
self.cache_write(key, value)
# Next write the UTXO table
key = (b'u' + cache_value[:21] + cache_key[:UTXO_TX_HASH_LEN]
+ cache_key[-2:])
value = cache_value[-12:]
prior_value = self.cache_get(key)
if prior_value: # Should almost never happen
ucolls += 1
value += prior_value
self.cache_write(key, value)
# GC-ing this now can only help the levelDB write.
self.cache = {}
# Now we can update to the batch.
for key, value in self.db_cache.items():
if value:
batch.put(key, value)
else:
batch.delete(key)
self.db_cache = {}
self.logger.info('UTXO cache adds: {:,d} spends: {:,d} '
.format(self.adds, self.cache_hits))
self.logger.info('UTXO DB adds: {:,d} spends: {:,d}. '
'Collisions: hash168: {:,d} UTXO: {:,d}'
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.adds = self.cache_hits = self.db_deletes = 0
class DB(object):
class Error(Exception):
pass
@ -48,168 +288,194 @@ class DB(object):
self.logger = logging.getLogger('DB')
self.logger.setLevel(logging.INFO)
# Meta
self.tx_hash_file_size = 16 * 1024 * 1024
self.cache_MB = env.cache_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.last_flush = time.time()
self.last_flush_tx_count = 0
self.coin = env.coin
self.flush_size = env.flush_size
self.logger.info('using flush size of {:,d} entries'
.format(self.flush_size))
self.tx_counts = array.array('I')
self.tx_hash_file_size = 4*1024*1024
# Unflushed items. Headers and tx_hashes have one entry per block
# Chain state (initialize to genesis in case of new DB)
self.db_height = -1
self.db_tx_count = 0
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.tip = self.coin.GENESIS_HASH
# Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin)
self.tx_count = self.db_tx_count
self.height = self.db_height
# Caches to be flushed later. Headers and tx_hashes have one
# entry per block
self.headers = []
self.tx_hashes = []
self.history = defaultdict(list)
self.writes_avoided = 0
self.read_cache_hits = 0
self.write_cache_hits = 0
self.hcolls = 0
# Things put in a batch are not visible until the batch is written,
# so use a cache.
# Semantics: a key/value pair in this dictionary represents the
# in-memory state of the DB. Anything in this dictionary will be
# written at the next flush.
self.write_cache = {}
# Read cache: a key/value pair in this dictionary represents
# something read from the DB; it is on-disk as of the prior
# flush. If a key is in write_cache that value is more
# recent. Any key in write_cache and not in read_cache has
# never hit the disk.
self.read_cache = {}
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} utxo flush count: {:,d} '
'sync time: {}'
.format(self.coin.NAME, self.coin.NET, self.height,
self.tx_count, self.flush_count,
self.utxo_flush_count,
formatted_time(self.wall_time)))
self.logger.info('flushing all after cache reaches {:,d} MB'
.format(self.cache_MB))
self.logger.info('flushing history cache at {:,d} MB'
.format(self.hist_MB))
def open_db(self, coin):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
is_new = False
try:
self.db = self.open_db(db_name, False)
db = plyvel.DB(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except:
self.db = self.open_db(db_name, True)
self.headers_file = self.open_file('headers', True)
self.txcount_file = self.open_file('txcount', True)
self.init_db()
db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
is_new = True
if is_new:
self.logger.info('created new database {}'.format(db_name))
self.flush_state(db)
else:
self.logger.info('successfully opened database {}'.format(db_name))
self.headers_file = self.open_file('headers')
self.txcount_file = self.open_file('txcount')
self.read_db()
self.read_state(db)
self.delete_excess_history(db)
# Note that DB_HEIGHT is the height of the next block to be written.
# So an empty DB has a DB_HEIGHT of 0 not -1.
self.tx_count = self.db_tx_count
self.height = self.db_height - 1
self.tx_counts.fromfile(self.txcount_file, self.db_height)
self.last_flush = time.time()
# FIXME: this sucks and causes issues with exceptions in init_db()
if self.tx_count == 0:
self.flush()
def open_db(self, db_name, create):
return plyvel.DB(db_name, create_if_missing=create,
error_if_exists=create,
compression=None)
# lru_cache_size=256*1024*1024)
def init_db(self):
self.db_height = 0
self.db_tx_count = 0
self.flush_count = 0
self.wall_time = 0
self.tip = self.coin.GENESIS_HASH
self.put(self.GENESIS_KEY, unhexlify(self.tip))
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
return db
def read_db(self):
genesis_hash = hexlify(self.get(self.GENESIS_KEY))
if genesis_hash != self.coin.GENESIS_HASH:
def read_state(self, db):
state = db.get(b'state')
state = ast.literal_eval(state.decode('ascii'))
if state['genesis'] != self.coin.GENESIS_HASH:
raise self.Error('DB genesis hash {} does not match coin {}'
.format(genesis_hash, self.coin.GENESIS_HASH))
self.db_height = from_4_bytes(self.get(self.HEIGHT_KEY))
self.db_tx_count = from_4_bytes(self.get(self.TX_COUNT_KEY))
self.flush_count = from_4_bytes(self.get(self.FLUSH_COUNT_KEY))
self.wall_time = from_4_bytes(self.get(self.WALL_TIME_KEY))
self.tip = hexlify(self.get(self.TIP_KEY))
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} sync time: {}'
.format(self.coin.NAME, self.coin.NET,
self.db_height - 1, self.db_tx_count,
self.flush_count, self.formatted_wall_time()))
def formatted_wall_time(self):
wall_time = int(self.wall_time)
return '{:d}d {:02d}h {:02d}m {:02d}s'.format(
wall_time // 86400, (wall_time % 86400) // 3600,
(wall_time % 3600) // 60, wall_time % 60)
def get(self, key):
# Get a key from write_cache, then read_cache, then the DB
value = self.write_cache.get(key)
if not value:
value = self.read_cache.get(key)
if not value:
value = self.db.get(key)
self.read_cache[key] = value
else:
self.read_cache_hits += 1
else:
self.write_cache_hits += 1
return value
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.tip = state['tip']
self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.last_flush_tx_count = self.db_tx_count
self.last_flush = time.time()
def put(self, key, value):
assert(bool(value))
self.write_cache[key] = value
def delete(self, key):
# Deleting an on-disk key requires a later physical delete
# If it's not on-disk we can just drop it entirely
if self.read_cache.get(key) is None:
self.writes_avoided += 1
self.write_cache.pop(key, None)
else:
self.write_cache[key] = None
def delete_excess_history(self, db):
'''Clear history flushed since the most recent UTXO flush.'''
utxo_flush_count = self.utxo_flush_count
diff = self.flush_count - utxo_flush_count
if diff == 0:
return
if diff < 0:
raise self.Error('DB corrupt: flush_count < utxo_flush_count')
self.logger.info('DB not shut down cleanly. Scanning for most '
'recent {:,d} history flushes'.format(diff))
prefix = b'H'
unpack = struct.unpack
keys = []
for key, hist in db.iterator(prefix=prefix):
flush_id, = unpack('>H', key[-2:])
if flush_id > self.utxo_flush_count:
keys.append(key)
self.logger.info('deleting {:,d} history entries'.format(len(keys)))
with db.write_batch(transaction=True) as batch:
for key in keys:
db.delete(key)
self.utxo_flush_count = self.flush_count
self.flush_state(batch)
self.logger.info('deletion complete')
def flush(self):
'''Flush out all cached state.'''
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.tip,
'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
}
batch.put(b'state', repr(state).encode('ascii'))
def flush(self, daemon_height, flush_utxos=False):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.db_tx_count
height_diff = self.height + 1 - self.db_height
self.logger.info('starting flush {:,d} txs and {:,d} blocks'
.format(tx_diff, height_diff))
# Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to
# have updated the DB. Flush state last as it reads the wall
# time.
self.flush_to_fs()
if flush_utxos:
# Write out the files to the FS before flushing to the DB.
# If the DB transaction fails, the files being too long
# doesn't matter. But if writing the files fails we do
# not want to have updated the DB. Flush state last as it
# reads the wall time.
self.logger.info('flushing UTXOs: {:,d} txs and {:,d} blocks'
.format(self.tx_count - self.db_tx_count,
self.height - self.db_height))
self.flush_to_fs()
else:
self.logger.info('commencing history flush')
with self.db.write_batch(transaction=True) as batch:
self.flush_cache(batch)
# History first - fast and frees memory
self.flush_history(batch)
self.logger.info('flushed history...')
if flush_utxos:
self.utxo_cache.flush(batch)
self.utxo_flush_count = self.flush_count
self.db_tx_count = self.tx_count
self.db_height = self.height
self.flush_state(batch)
self.logger.info('committing transaction...')
# Update and put the wall time again - otherwise we drop the
# time it takes leveldb to commit the batch
self.update_wall_time(self.db)
# time it took leveldb to commit the batch
self.flush_state(self.db)
flush_time = int(self.last_flush - flush_start)
self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} '
'flush count {:,d}'
.format(flush_time, self.height, self.tx_count,
self.flush_count))
self.logger.info('flush #{:,d} to height {:,d} took {:,d}s'
.format(self.flush_count, self.height, flush_time))
# Log handy stats
tx_diff = self.tx_count - self.last_flush_tx_count
self.last_flush_tx_count = self.tx_count
txs_per_sec = int(self.tx_count / self.wall_time)
this_txs_per_sec = int(tx_diff / (self.last_flush - last_flush))
self.logger.info('tx/s since genesis: {:,d} since last flush: {:,d} '
'sync time {}'
.format(txs_per_sec, this_txs_per_sec,
self.formatted_wall_time()))
# Note this preserves semantics and hopefully saves time
self.read_cache = self.write_cache
self.write_cache = {}
self.writes_avoided = 0
self.read_cache_hits = 0
self.write_cache_hits = 0
this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
else:
tx_est = ((daemon_height - self.coin.TX_COUNT_HEIGHT)
* self.coin.TX_PER_BLOCK
+ (self.coin.TX_COUNT - self.tx_count))
self.logger.info('txs: {:,d} tx/sec since genesis: {:,d}, '
'since last flush: {:,d}'
.format(self.tx_count, txs_per_sec, this_txs_per_sec))
self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time),
formatted_time(tx_est / this_txs_per_sec)))
def flush_to_fs(self):
'''Flush the things stored on the filesystem.'''
@ -218,145 +484,26 @@ class DB(object):
self.write_tx_hashes()
os.sync()
def update_wall_time(self, dest):
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time)))
def flush_state(self, batch):
self.db_tx_count = self.tx_count
self.db_height = self.height + 1
batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height))
batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count))
batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count))
batch.put(self.TIP_KEY, unhexlify(self.tip))
self.update_wall_time(batch)
self.flush_count += 1
def flush_cache(self, batch):
'''Flushes the UTXO write cache.'''
deletes = writes = 0
for n, (key, value) in enumerate(self.write_cache.items()):
if value is None:
batch.delete(key)
deletes += 1
else:
batch.put(key, value)
writes += 1
self.logger.info('flushed UTXO cache. Hits: {:,d}/{:,d} '
'writes: {:,d} deletes: {:,d} elided: {:,d}'
.format(self.write_cache_hits,
self.read_cache_hits, writes, deletes,
self.writes_avoided))
def flush_history(self, batch):
# Drop any None entry
self.history.pop(None, None)
self.flush_count += 1
flush_id = struct.pack('>H', self.flush_count)
for hash168, hist in self.history.items():
key = b'H' + hash168 + flush_id
batch.put(key, array.array('I', hist).tobytes())
self.history = defaultdict(list)
batch.put(key, hist.tobytes())
def get_hash168(self, tx_hash, idx, delete=True):
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + struct.pack('<H', idx)
data = self.get(key)
if data is None:
return None
self.logger.info('{:,d} history entries in {:,d} addrs'
.format(self.history_size, len(self.history)))
if len(data) == 25:
if delete:
self.delete(key)
return data[:21]
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
assert len(data) % 25 == 0
self.hcolls += 1
if self.hcolls % 1000 == 0:
self.logger.info('{} total hash168 compressed key collisions'
.format(self.hcolls))
for n in range(0, len(data), 25):
(tx_num, ) = struct.unpack('<I', data[n+21 : n+25])
my_hash, height = self.get_tx_hash(tx_num)
if my_hash == tx_hash:
if delete:
self.put(key, data[:n] + data[n+25:])
return data[n : n+21]
raise Exception('could not resolve hash168 collision')
def spend_utxo(self, prevout):
hash168 = self.get_hash168(prevout.hash, prevout.n)
if hash168 is None:
# This indicates a successful spend of a non-standard script
# self.logger.info('ignoring spend of non-standard UTXO {}/{:d} '
# 'at height {:d}'
# .format(bytes(reversed(prevout.hash)).hex(),
# prevout.n, self.height))
return None
key = (b'u' + hash168 + prevout.hash[:UTXO_TX_HASH_LEN]
+ struct.pack('<H', prevout.n))
data = self.get(key)
if data is None:
# Uh-oh, this should not happen. It may be recoverable...
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(bytes(reversed(prevout.hash)).hex(),
prevout.n, bytes(key).hex()))
return hash168
if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.delete(key)
else:
# This should almost never happen
assert len(data) % (4 + 8) == 0
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.get_tx_hash(tx_num)
if prevout.hash == tx_hash:
break
else:
raise Exception('could not resolve UTXO key collision')
data = data[:n] + data[n + 12:]
self.put(key, data)
return hash168
def put_utxo(self, tx_hash, idx, txout):
pk = ScriptPubKey.from_script(txout.pk_script, self.coin)
if not pk.hash168:
return None
pack = struct.pack
idxb = pack('<H', idx)
txcb = pack('<I', self.tx_count)
# First write the hash168 lookup
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idxb
# b''.join avoids this: https://bugs.python.org/issue13298
value = b''.join((pk.hash168, txcb))
prior_value = self.get(key)
if prior_value: # Should almost never happen
value += prior_value
self.put(key, value)
# Next write the UTXO
key = b'u' + pk.hash168 + tx_hash[:UTXO_TX_HASH_LEN] + idxb
value = txcb + pack('<Q', txout.value)
prior_value = self.get(key)
if prior_value: # Should almost never happen
value += prior_value
self.put(key, value)
return pk.hash168
def open_file(self, filename, truncate=False, create=False):
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'wb+' if truncate else 'rb+')
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
@ -371,14 +518,15 @@ class DB(object):
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
assert len(headers) % header_len == 0
self.headers_file.seek(self.db_height * header_len)
self.headers_file.seek((self.db_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
self.headers = []
def write_tx_counts(self):
self.txcount_file.seek(self.db_height * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.db_height: self.height + 1])
self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.db_height + 1:
self.height + 1])
self.txcount_file.flush()
def write_tx_hashes(self):
@ -391,7 +539,7 @@ class DB(object):
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:05d}'.format(file_num)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
@ -399,7 +547,33 @@ class DB(object):
file_pos += size
self.tx_hashes = []
def process_block(self, block):
def cache_sizes(self, daemon_height):
'''Returns the approximate size of the cache, in MB.'''
# Good average estimates based on traversal of subobjects and
# requesting size from Python (see deep_getsizeof). For
# whatever reason Python O/S mem usage is typically +30% or
# more, so we scale our already bloated object sizes.
one_MB = int(1048576 / 1.3)
utxo_cache_size = len(self.utxo_cache.cache) * 187
db_cache_size = len(self.utxo_cache.db_cache) * 105
hist_cache_size = len(self.history) * 180 + self.history_size * 4
utxo_MB = (db_cache_size + utxo_cache_size) // one_MB
hist_MB = hist_cache_size // one_MB
cache_MB = utxo_MB + hist_MB
self.logger.info('cache stats at height {:,d} daemon height: {:,d}'
.format(self.height, daemon_height))
self.logger.info(' entries: UTXO: {:,d} DB: {:,d} '
'hist addrs: {:,d} hist size: {:,d}'
.format(len(self.utxo_cache.cache),
len(self.utxo_cache.db_cache),
len(self.history),
self.history_size))
self.logger.info(' size: {:,d}MB (UTXOs {:,d}MB hist {:,d}MB)'
.format(cache_MB, utxo_MB, hist_MB))
return cache_MB, hist_MB
def process_block(self, block, daemon_height):
self.headers.append(block[:self.coin.HEADER_LEN])
tx_hashes, txs = self.coin.read_block(block)
@ -415,21 +589,27 @@ class DB(object):
for tx_hash, tx in zip(tx_hashes, txs):
self.process_tx(tx_hash, tx)
# Flush if we're getting full
if len(self.write_cache) + len(self.history) > self.flush_size:
self.flush()
# Check if we're getting full and time to flush?
now = time.time()
if now > self.next_cache_check:
self.next_cache_check = now + 60
cache_MB, hist_MB = self.cache_sizes(daemon_height)
if cache_MB >= self.cache_MB or hist_MB >= self.hist_MB:
self.flush(daemon_height, cache_MB >= self.cache_MB)
def process_tx(self, tx_hash, tx):
hash168s = set()
cache = self.utxo_cache
tx_num = self.tx_count
# Add the outputs as new UTXOs; spend the inputs
hash168s = cache.add_many(tx_hash, tx_num, tx.outputs)
if not tx.is_coinbase:
for txin in tx.inputs:
hash168s.add(self.spend_utxo(txin.prevout))
for idx, txout in enumerate(tx.outputs):
hash168s.add(self.put_utxo(tx_hash, idx, txout))
hash168s.add(cache.spend(txin.prevout))
for hash168 in hash168s:
self.history[hash168].append(self.tx_count)
self.history[hash168].append(tx_num)
self.history_size += len(hash168s)
self.tx_count += 1
@ -438,13 +618,13 @@ class DB(object):
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height >= self.db_height:
tx_hashes = self.tx_hashes[height - self.db_height]
if height > self.db_height:
tx_hashes = self.tx_hashes[height - (self.db_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:05d}'.format(file_num)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)

3
server/env.py

@ -20,7 +20,8 @@ class Env(object):
network = self.default('NETWORK', 'mainnet')
self.coin = Coin.lookup_coin_class(coin_name, network)
self.db_dir = self.required('DB_DIRECTORY')
self.flush_size = self.integer('FLUSH_SIZE', 1000000)
self.cache_MB = self.integer('CACHE_MB', 1000)
self.hist_MB = self.integer('HIST_MB', 250)
self.rpc_url = self.build_rpc_url()
def default(self, envvar, default):

177
server/server.py

@ -19,78 +19,67 @@ class Server(object):
def __init__(self, env):
self.env = env
self.db = DB(env)
self.rpc = RPC(env)
self.block_cache = BlockCache(env, self.db, self.rpc)
def async_tasks(self):
return [
self.block_cache = BlockCache(env, self.db)
self.tasks = [
asyncio.ensure_future(self.block_cache.catch_up()),
asyncio.ensure_future(self.block_cache.process_cache()),
]
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def on_signal(self, signame):
logging.warning('received {} signal, preparing to shut down'
.format(signame))
for task in self.tasks:
task.cancel()
def async_tasks(self):
return self.tasks
class BlockCache(object):
'''Requests blocks ahead of time from the daemon. Serves them
to the blockchain processor.'''
def __init__(self, env, db, rpc):
def __init__(self, env, db):
self.logger = logging.getLogger('BlockCache')
self.logger.setLevel(logging.INFO)
self.db = db
self.rpc = rpc
self.stop = False
self.rpc_url = env.rpc_url
# Cache target size is in MB. Has little effect on sync time.
self.cache_limit = 10
self.daemon_height = 0
self.fetched_height = db.db_height
self.fetched_height = db.height
# Blocks stored in reverse order. Next block is at end of list.
self.blocks = []
self.recent_sizes = []
self.ave_size = 0
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def on_signal(self, signame):
logging.warning('Received {} signal, preparing to shut down'
.format(signame))
self.blocks = []
self.stop = True
self.logger.info('using RPC URL {}'.format(self.rpc_url))
async def process_cache(self):
while not self.stop:
while True:
await asyncio.sleep(1)
while self.blocks:
self.db.process_block(self.blocks.pop())
self.db.process_block(self.blocks.pop(), self.daemon_height)
# Release asynchronous block fetching
await asyncio.sleep(0)
self.db.flush()
async def catch_up(self):
self.logger.info('catching up, block cache limit {:d}MB...'
.format(self.cache_limit))
last_log = 0
prior_height = self.db.height
while await self.maybe_prefill():
now = time.time()
count = self.fetched_height - prior_height
if now > last_log + 15 and count:
last_log = now
prior_height = self.fetched_height
self.logger.info('prefilled {:,d} blocks to height {:,d} '
'daemon height: {:,d}'
.format(count, self.fetched_height,
self.daemon_height))
await asyncio.sleep(1)
if not self.stop:
try:
while await self.maybe_prefill():
await asyncio.sleep(1)
self.logger.info('caught up to height {:d}'
.format(self.daemon_height))
finally:
self.db.flush(self.daemon_height, True)
def cache_used(self):
return sum(len(block) for block in self.blocks)
@ -106,37 +95,28 @@ class BlockCache(object):
processing.'''
cache_limit = self.cache_limit * 1024 * 1024
while True:
if self.stop:
return False
cache_used = self.cache_used()
if cache_used > cache_limit:
return True
# Keep going by getting a whole new cache_limit of blocks
self.daemon_height = await self.rpc.rpc_single('getblockcount')
self.daemon_height = await self.send_single('getblockcount')
max_count = min(self.daemon_height - self.fetched_height, 4000)
count = min(max_count, self.prefill_count(cache_limit))
if not count or self.stop:
if not count:
return False # Done catching up
first = self.fetched_height + 1
param_lists = [[height] for height in range(first, first + count)]
hashes = await self.rpc.rpc_multi('getblockhash', param_lists)
if self.stop:
return False
hashes = await self.send_vector('getblockhash', param_lists)
# Hashes is an array of hex strings
param_lists = [(h, False) for h in hashes]
blocks = await self.rpc.rpc_multi('getblock', param_lists)
blocks = await self.send_vector('getblock', param_lists)
self.fetched_height += count
if self.stop:
return False
# Convert hex string to bytes and put in memoryview
blocks = [memoryview(bytes.fromhex(block)) for block in blocks]
blocks = [bytes.fromhex(block) for block in blocks]
# Reverse order and place at front of list
self.blocks = list(reversed(blocks)) + self.blocks
@ -148,64 +128,47 @@ class BlockCache(object):
self.recent_sizes = self.recent_sizes[excess:]
self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
class RPC(object):
def __init__(self, env):
self.logger = logging.getLogger('RPC')
self.logger.setLevel(logging.INFO)
self.rpc_url = env.rpc_url
self.logger.info('using RPC URL {}'.format(self.rpc_url))
async def rpc_multi(self, method, param_lists):
payload = [{'method': method, 'params': param_list}
for param_list in param_lists]
while True:
dresults = await self.daemon(payload)
errs = [dresult['error'] for dresult in dresults]
if not any(errs):
return [dresult['result'] for dresult in dresults]
for err in errs:
if err.get('code') == -28:
self.logger.warning('daemon still warming up...')
secs = 10
break
else:
self.logger.error('daemon returned errors: {}'.format(errs))
secs = 0
self.logger.info('sleeping {:d} seconds and trying again...'
.format(secs))
await asyncio.sleep(secs)
async def rpc_single(self, method, params=None):
async def send_single(self, method, params=None):
payload = {'method': method}
if params:
payload['params'] = params
while True:
dresult = await self.daemon(payload)
err = dresult['error']
if not err:
return dresult['result']
if err.get('code') == -28:
self.logger.warning('daemon still warming up...')
secs = 10
else:
self.logger.error('daemon returned error: {}'.format(err))
secs = 0
self.logger.info('sleeping {:d} seconds and trying again...'
.format(secs))
await asyncio.sleep(secs)
async def daemon(self, payload):
result, = await self.send((payload, ))
return result
async def send_many(self, mp_pairs):
payload = [{'method': method, 'params': params}
for method, params in mp_pairs]
return await self.send(payload)
async def send_vector(self, method, params_list):
payload = [{'method': method, 'params': params}
for params in params_list]
return await self.send(payload)
async def send(self, payload):
assert isinstance(payload, (tuple, list))
data = json.dumps(payload)
while True:
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.rpc_url,
data=json.dumps(payload)) as resp:
return await resp.json()
async with aiohttp.request('POST', self.rpc_url,
data = data) as resp:
result = await resp.json()
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.error('aiohttp error: {}'.format(e))
self.logger.info('sleeping 1 second and trying again...')
await asyncio.sleep(1)
msg = 'aiohttp error: {}'.format(e)
secs = 3
else:
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == -28 for err in errs):
msg = 'daemon still warming up.'
secs = 30
else:
msg = 'daemon errors: {}'.format(errs)
secs = 3
self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs))
await asyncio.sleep(secs)

2
server_main.py

@ -28,6 +28,8 @@ def main_loop():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(*tasks))
except asyncio.CancelledError:
logging.warning('task cancelled; asyncio event loop closing')
finally:
loop.close()

Loading…
Cancel
Save