Browse Source

Merge branch 'release-0.04'

master
Neil Booth 8 years ago
parent
commit
ebc41e2e8b
  1. 16
      HOWTO.rst
  2. 24
      README.rst
  3. 20
      RELEASE-NOTES
  4. 11
      electrumx_rpc.py
  5. 78
      lib/coins.py
  6. 23
      lib/enum.py
  7. 44
      lib/hash.py
  8. 25
      lib/script.py
  9. 15
      lib/tx.py
  10. 19
      lib/util.py
  11. 14
      query.py
  12. 23
      samples/scripts/NOTES
  13. 99
      server/ARCHITECTURE.rst
  14. 282
      server/block_processor.py
  15. 38
      server/cache.py
  16. 158
      server/controller.py
  17. 33
      server/daemon.py
  18. 12
      server/env.py
  19. 525
      server/protocol.py
  20. 158
      server/storage.py
  21. 2
      server/version.py
  22. 11
      server_main.py

16
HOWTO.rst

@ -32,12 +32,12 @@ recommend having at least 30-40GB free space.
Database Engine
===============
You can choose between either RocksDB, LevelDB or LMDB to store transaction
information on disk. Currently, the fastest seems to be RocksDB with LevelDB
being about 10% slower. LMDB seems to be the slowest but maybe that's because
of bad implementation or configuration.
You can choose from RocksDB, LevelDB or LMDB to store transaction
information on disk. Currently, the fastest seems to be RocksDB with
LevelDB being about 10% slower. LMDB is slowest but that is because it
is not yet efficiently abstracted.
You will need to install either:
You will need to install one of:
+ `plyvel <https://plyvel.readthedocs.io/en/latest/installation.html>`_ for LevelDB
+ `pyrocksdb <http://pyrocksdb.readthedocs.io/en/v0.4/installation.html>`_ for RocksDB
@ -188,7 +188,7 @@ over the LAN from a bitcoind on machine B.
Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz
quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on
the same machine. HIST_MB of 350, UTXO_MB of 1,600.
the same machine. HIST_MB of 350, UTXO_MB of 1,600. LevelDB.
For chains other than bitcoin-mainnet sychronization should be much
faster.
@ -275,5 +275,5 @@ After flush-to-disk you may see an aiohttp error; this is the daemon
timing out the connection while the disk flush was in progress. This
is harmless.
The ETA is just a guide and can be quite volatile. It is too optimistic
initially.
The ETA is just a guide and can be quite volatile, particularly around
flushes. It is too optimistic initially.

24
README.rst

@ -68,26 +68,24 @@ Roadmap
=======
- test a few more performance improvement ideas
- handle client connections (half-implemented but not functional)
- handle the mempool
- implement light caching of client responses
- yield during expensive requests and/or penalize the connection
- improve DB abstraction so LMDB is not penalized
- continue to clean up the code and remove layering violations
- store all UTXOs, not just those with addresses
- implement IRC connectivity
- potentially move some functionality to C or C++
Once I get round to writing the server part, I will add DoS
protections if necessary to defend against requests for large
histories. However with asyncio it would not surprise me if ElectrumX
could smoothly serve the whole history of the biggest Satoshi dice
address with minimal negative impact on other connections; we shall
have to see. If the requestor is running Electrum client I am
confident that it would collapse under the load far more quickly that
the server would; it is very inefficient at handling large wallets
and histories.
The above are in no particular order.
Database Format
===============
The database and metadata formats of ElectrumX are very likely to
change in the future which will render old DBs unusable. For now I do
not intend to provide converters as the rate of flux is high.
The database and metadata formats of ElectrumX is certain to change in
the future which will render old DBs unusable. For now I do not
intend to provide converters as the rate of flux is high.
Miscellany

20
RELEASE-NOTES

@ -1,3 +1,23 @@
Version 0.04
------------
- made the DB interface a little faster for LevelDB and RocksDB; this was
a small regression in 0.03
- fixed a bug that prevented block reorgs from working
- implement and enable client connectivity. This is not yet ready for
public use for several reasons. Local RPC, and remote TCP and SSL
connections are all supported in the same way as Electrum-server.
ElectrumX does not begin listening for incoming connections until it
has caught up with the daemon's height. Which ports it is listening
on will appear in the logs when it starts listening. The complete
Electrum wire protocol is implemented, so it is possible to now use
as a server for your own Electrum client. Note that mempools are
not yet handled so unconfirmed transactions will not be notified or
appear; they will appear once they get in a block. Also no
responses are cached, so performance would likely degrade if used by
many clients. I welcome feedback on your experience using this.
Version 0.03
------------

11
electrumx_rpc.py

@ -1,8 +1,15 @@
#!/usr/bin/env python3
# See the file "LICENSE" for information about the copyright
#
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Script to send RPC commands to a running ElectrumX server.'''
import argparse
import asyncio
import json

78
lib/coins.py

@ -1,18 +1,29 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Module providing coin abstraction.
Anything coin-specific should go in this file and be subclassed where
necessary for appropriate handling.
'''
from decimal import Decimal
import inspect
import struct
import sys
from lib.hash import Base58, hash160, double_sha256
from lib.hash import Base58, hash160, double_sha256, hash_to_str
from lib.script import ScriptPubKey
from lib.tx import Deserializer
from lib.util import subclasses
class CoinError(Exception):
pass
'''Exception raised for coin-related errors.'''
class Coin(object):
@ -22,19 +33,14 @@ class Coin(object):
HEADER_LEN = 80
DEFAULT_RPC_PORT = 8332
VALUE_PER_COIN = 100000000
@staticmethod
def coins():
is_coin = lambda obj: (inspect.isclass(obj)
and issubclass(obj, Coin)
and obj != Coin)
pairs = inspect.getmembers(sys.modules[__name__], is_coin)
# Returned in the order they appear in this file
return [pair[1] for pair in pairs]
CHUNK_SIZE=2016
@classmethod
def lookup_coin_class(cls, name, net):
for coin in cls.coins():
'''Return a coin class given name and network.
Raise an exception if unrecognised.'''
for coin in subclasses(Coin):
if (coin.NAME.lower() == name.lower()
and coin.NET.lower() == net.lower()):
return coin
@ -43,13 +49,14 @@ class Coin(object):
@staticmethod
def lookup_xverbytes(verbytes):
'''Return a (is_xpub, coin_class) pair given xpub/xprv verbytes.'''
# Order means BTC testnet will override NMC testnet
for coin in Coin.coins():
for coin in Coin.coin_classes():
if verbytes == coin.XPUB_VERBYTES:
return True, coin
if verbytes == coin.XPRV_VERBYTES:
return False, coin
raise CoinError("version bytes unrecognised")
raise CoinError('version bytes unrecognised')
@classmethod
def address_to_hash168(cls, addr):
@ -62,6 +69,11 @@ class Coin(object):
raise CoinError('invalid address: {}'.format(addr))
return result
@classmethod
def hash168_to_address(cls, hash168):
'''Return an address given a 21-byte hash.'''
return Base58.encode_check(hash168)
@classmethod
def P2PKH_address_from_hash160(cls, hash_bytes):
'''Return a P2PKH address given a public key.'''
@ -129,7 +141,7 @@ class Coin(object):
@classmethod
def prvkey_WIF(privkey_bytes, compressed):
"Return the private key encoded in Wallet Import Format."
'''Return the private key encoded in Wallet Import Format.'''
payload = bytearray([cls.WIF_BYTE]) + privkey_bytes
if compressed:
payload.append(0x01)
@ -137,20 +149,39 @@ class Coin(object):
@classmethod
def header_hashes(cls, header):
'''Given a header return the previous block hash and the current block
hash.'''
'''Given a header return the previous and current block hashes.'''
return header[4:36], double_sha256(header)
@classmethod
def read_block(cls, block):
'''Read a block and return (header, tx_hashes, txs)'''
'''Return a tuple (header, tx_hashes, txs) given a raw block.'''
header, rest = block[:cls.HEADER_LEN], block[cls.HEADER_LEN:]
return (header, ) + Deserializer(rest).read_block()
@classmethod
def decimal_value(cls, value):
'''Return the number of standard coin units as a Decimal given a
quantity of smallest units.
For example 1 BTC is returned for 100 million satoshis.
'''
return Decimal(value) / cls.VALUE_PER_COIN
@classmethod
def electrum_header(cls, header, height):
version, = struct.unpack('<I', header[:4])
timestamp, bits, nonce = struct.unpack('<III', header[68:80])
return {
'block_height': height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}
class Bitcoin(Coin):
NAME = "Bitcoin"
@ -167,6 +198,7 @@ class Bitcoin(Coin):
TX_COUNT_HEIGHT = 420976
TX_PER_BLOCK = 1600
class BitcoinTestnet(Coin):
NAME = "Bitcoin"
SHORTNAME = "XTN"
@ -177,6 +209,7 @@ class BitcoinTestnet(Coin):
P2SH_VERBYTE = 0xc4
WIF_BYTE = 0xef
# Source: pycoin and others
class Litecoin(Coin):
NAME = "Litecoin"
@ -188,6 +221,7 @@ class Litecoin(Coin):
P2SH_VERBYTE = 0x05
WIF_BYTE = 0xb0
class LitecoinTestnet(Coin):
NAME = "Litecoin"
SHORTNAME = "XLT"
@ -198,6 +232,7 @@ class LitecoinTestnet(Coin):
P2SH_VERBYTE = 0xc4
WIF_BYTE = 0xef
# Source: namecoin.org
class Namecoin(Coin):
NAME = "Namecoin"
@ -209,6 +244,7 @@ class Namecoin(Coin):
P2SH_VERBYTE = 0x0d
WIF_BYTE = 0xe4
class NamecoinTestnet(Coin):
NAME = "Namecoin"
SHORTNAME = "XNM"
@ -219,6 +255,7 @@ class NamecoinTestnet(Coin):
P2SH_VERBYTE = 0xc4
WIF_BYTE = 0xef
# For DOGE there is disagreement across sites like bip32.org and
# pycoin. Taken from bip32.org and bitmerchant on github
class Dogecoin(Coin):
@ -231,6 +268,7 @@ class Dogecoin(Coin):
P2SH_VERBYTE = 0x16
WIF_BYTE = 0x9e
class DogecoinTestnet(Coin):
NAME = "Dogecoin"
SHORTNAME = "XDT"
@ -241,6 +279,7 @@ class DogecoinTestnet(Coin):
P2SH_VERBYTE = 0xc4
WIF_BYTE = 0xf1
# Source: pycoin
class Dash(Coin):
NAME = "Dash"
@ -252,6 +291,7 @@ class Dash(Coin):
P2SH_VERBYTE = 0x10
WIF_BYTE = 0xcc
class DashTestnet(Coin):
NAME = "Dogecoin"
SHORTNAME = "tDASH"

23
lib/enum.py

@ -1,8 +1,17 @@
# enum-like type
# From the Python Cookbook from http://code.activestate.com/recipes/67107/
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''An enum-like type with reverse lookup.
class EnumException(Exception):
Source: Python Cookbook, http://code.activestate.com/recipes/67107/
'''
class EnumError(Exception):
pass
@ -20,13 +29,13 @@ class Enumeration:
if isinstance(x, tuple):
x, i = x
if not isinstance(x, str):
raise EnumException("enum name {} not a string".format(x))
raise EnumError("enum name {} not a string".format(x))
if not isinstance(i, int):
raise EnumException("enum value {} not an integer".format(i))
raise EnumError("enum value {} not an integer".format(i))
if x in uniqueNames:
raise EnumException("enum name {} not unique".format(x))
raise EnumError("enum name {} not unique".format(x))
if i in uniqueValues:
raise EnumException("enum value {} not unique".format(x))
raise EnumError("enum value {} not unique".format(x))
uniqueNames.add(x)
uniqueValues.add(i)
lookup[x] = i

44
lib/hash.py

@ -1,6 +1,13 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Cryptograph hash functions and related classes.'''
import hashlib
import hmac
@ -8,11 +15,13 @@ from lib.util import bytes_to_int, int_to_bytes
def sha256(x):
'''Simple wrapper of hashlib sha256.'''
assert isinstance(x, (bytes, bytearray, memoryview))
return hashlib.sha256(x).digest()
def ripemd160(x):
'''Simple wrapper of hashlib ripemd160.'''
assert isinstance(x, (bytes, bytearray, memoryview))
h = hashlib.new('ripemd160')
h.update(x)
@ -20,36 +29,41 @@ def ripemd160(x):
def double_sha256(x):
'''SHA-256 of SHA-256, as used extensively in bitcoin.'''
return sha256(sha256(x))
def hmac_sha512(key, msg):
'''Use SHA-512 to provide an HMAC.'''
return hmac.new(key, msg, hashlib.sha512).digest()
def hash160(x):
'''RIPEMD-160 of SHA-256.
Used to make bitcoin addresses from pubkeys.'''
return ripemd160(sha256(x))
def hash_to_str(x):
'''Converts a big-endian binary hash to a little-endian hex string, as
shown in block explorers, etc.
'''Convert a big-endian binary hash to displayed hex string.
Display form of a binary hash is reversed and converted to hex.
'''
return bytes(reversed(x)).hex()
def hex_str_to_hash(x):
'''Converts a little-endian hex string as shown to a big-endian binary
hash.'''
'''Convert a displayed hex string to a binary hash.'''
return bytes(reversed(bytes.fromhex(x)))
class InvalidBase58String(Exception):
pass
class InvalidBase58CheckSum(Exception):
pass
class Base58Error(Exception):
'''Exception used for Base58 errors.'''
class Base58(object):
'''Class providing base 58 functionality.'''
chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
assert len(chars) == 58
@ -59,17 +73,17 @@ class Base58(object):
def char_value(c):
val = Base58.cmap.get(c)
if val is None:
raise InvalidBase58String
raise Base58Error('invalid base 58 character "{}"'.format(c))
return val
@staticmethod
def decode(txt):
"""Decodes txt into a big-endian bytearray."""
if not isinstance(txt, str):
raise InvalidBase58String("a string is required")
raise Base58Error('a string is required')
if not txt:
raise InvalidBase58String("string cannot be empty")
raise Base58Error('string cannot be empty')
value = 0
for c in txt:
@ -112,14 +126,14 @@ class Base58(object):
be_bytes = Base58.decode(txt)
result, check = be_bytes[:-4], be_bytes[-4:]
if check != double_sha256(result)[:4]:
raise InvalidBase58CheckSum
raise Base58Error('invalid base 58 checksum for {}'.format(txt))
return result
@staticmethod
def encode_check(payload):
"""Encodes a payload bytearray (which includes the version byte(s))
into a Base58Check string."""
assert isinstance(payload, (bytes, bytearray))
assert isinstance(payload, (bytes, bytearray, memoryview))
be_bytes = payload + double_sha256(payload)[:4]
return Base58.encode(be_bytes)

25
lib/script.py

@ -1,6 +1,13 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Script-related classes and functions.'''
from binascii import hexlify
import struct
@ -10,7 +17,7 @@ from lib.util import cachedproperty
class ScriptError(Exception):
pass
'''Exception used for script errors.'''
OpCodes = Enumeration("Opcodes", [
@ -52,7 +59,9 @@ assert OpCodes.OP_CHECKMULTISIG == 0xae
class ScriptSig(object):
'''A script from a tx input, typically provides one or more signatures.'''
'''A script from a tx input.
Typically provides one or more signatures.'''
SIG_ADDRESS, SIG_MULTI, SIG_PUBKEY, SIG_UNKNOWN = range(4)
@ -73,8 +82,9 @@ class ScriptSig(object):
@classmethod
def from_script(cls, script, coin):
'''Returns an instance of this class. Uncrecognised scripts return
an object of kind SIG_UNKNOWN.'''
'''Return an instance of this class.
Return an object with kind SIG_UNKNOWN for unrecognised scripts.'''
try:
return cls.parse_script(script, coin)
except ScriptError:
@ -82,8 +92,9 @@ class ScriptSig(object):
@classmethod
def parse_script(cls, script, coin):
'''Returns an instance of this class. Raises on unrecognised
scripts.'''
'''Return an instance of this class.
Raises on unrecognised scripts.'''
ops, datas = Script.get_ops(script)
# Address, PubKey and P2SH redeems only push data

15
lib/tx.py

@ -1,6 +1,13 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Transaction-related classes and functions.'''
from collections import namedtuple
import struct
@ -9,6 +16,7 @@ from lib.hash import double_sha256, hash_to_str
class Tx(namedtuple("Tx", "version inputs outputs locktime")):
'''Class representing a transaction.'''
@cachedproperty
def is_coinbase(self):
@ -17,6 +25,7 @@ class Tx(namedtuple("Tx", "version inputs outputs locktime")):
# FIXME: add hash as a cached property?
class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")):
'''Class representing a transaction input.'''
ZERO = bytes(32)
MINUS_1 = 4294967295
@ -41,6 +50,7 @@ class TxInput(namedtuple("TxInput", "prev_hash prev_idx script sequence")):
class TxOutput(namedtuple("TxOutput", "value pk_script")):
'''Class representing a transaction output.'''
@cachedproperty
def pay_to(self):
@ -48,9 +58,10 @@ class TxOutput(namedtuple("TxOutput", "value pk_script")):
class Deserializer(object):
'''Deserializes blocks into transactions.'''
def __init__(self, binary):
assert isinstance(binary, (bytes, memoryview))
assert isinstance(binary, bytes)
self.binary = binary
self.cursor = 0

19
lib/util.py

@ -1,7 +1,15 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Miscellaneous utility classes and functions.'''
import array
import inspect
import logging
import sys
from collections import Container, Mapping
@ -70,8 +78,17 @@ def deep_getsizeof(obj):
return size(obj)
def subclasses(base_class, strict=True):
'''Return a list of subclasses of base_class in its module.'''
def select(obj):
return (inspect.isclass(obj) and issubclass(obj, base_class)
and (not strict or obj != base_class))
pairs = inspect.getmembers(sys.modules[base_class.__module__], select)
return [pair[1] for pair in pairs]
def chunks(items, size):
'''Break up items, an iterable, into chunks of length size.'''
for i in range(0, len(items), size):
yield items[i: i + size]

14
query.py

@ -1,8 +1,18 @@
#!/usr/bin/env python3
# See the file "LICENSE" for information about the copyright
#
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Script to query the database for debugging purposes.
Not currently documented; might become easier to use in future.
'''
import os
import sys

23
samples/scripts/NOTES

@ -1,8 +1,8 @@
The following environment variables are required:
DB_DIRECTORY - path to the database directory (if relative, to run script)
USERNAME - the username the server will run as
SERVER_MAIN - path to the server_main.py script (if relative, to run script)
DB_DIRECTORY - path to the database directory (if relative, to `run` script)
USERNAME - the username the server will run as if using `run` script
SERVER_MAIN - path to the server_main.py script (if relative, to `run` script)
DAEMON_URL - the URL used to connect to the daemon. Should be of the form
http://username:password@hostname:port/
Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD,
@ -14,10 +14,22 @@ sensible defaults if not specified.
COIN - see lib/coins.py, must be a coin NAME. Defaults to Bitcoin.
NETWORK - see lib/coins.py, must be a coin NET. Defaults to mainnet.
DB_ENGINE - database engine for the transaction database. Default is
leveldb. Supported alternatives are rocksdb and lmdb.
You will need to install the appropriate python packages.
Not case sensitive.
REORG_LIMIT - maximum number of blocks to be able to handle in a chain
reorganisation. ElectrumX retains some fairly compact
undo information for this many blocks in levelDB.
Default is 200.
TCP_PORT - if set will serve Electrum clients on that port
SSL_PORT - if set will serve Electrum clients over SSL on that port.
If set SSL_CERTFILE and SSL_KEYFILE must be filesystem paths
RPC_PORT - Listen on this port for local RPC connections, defaults to
8000.
BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client.
DONATION_ADDRESS - server donation address. Defaults to none.
Your performance might change by tweaking these cache settings. Cache
size is only checked roughly every minute, so the caches can grow
@ -41,8 +53,3 @@ UTXO_MB - amount of UTXO and history cache, in MB, to retain before
leveldb caching and Python GC effects. However this may be
very dependent on hardware and you may have different
results.
DB_ENGINE - database engine for the transaction database. Default is
leveldb. Supported alternatives are rocksdb and lmdb,
which will require installation of the appropriate python
packages.

99
server/ARCHITECTURE.rst

@ -0,0 +1,99 @@
Components
==========
The components of the server are roughly like this::
-------
- Env -
-------
-------
- IRC -
-------
<
------------- ------------
- ElectrumX -<<<<<- LocalRPC -
------------- ------------
< >
---------- ------------------- ----------
- Daemon -<<<<<<<<- Block processor ->>>>- Caches -
---------- ------------------- ----------
< < > <
-------------- -----------
- Prefetcher - - Storage -
-------------- -----------
Env
---
Holds configuration taken from the environment. Handles defaults
appropriately. Generally passed to the constructor of other
components which take their settings from it.
LocalRPC
--------
Handles local JSON RPC connections querying ElectrumX server state.
Not started until the block processor has caught up with the daemon.
ElectrumX
---------
Handles JSON Electrum client connections over TCP or SSL. One
instance per client session. Should be the only component concerned
with the details of the Electrum wire protocol. Responsible for
caching of client responses. Not started until the block processor
has caught up with the daemon. Logically, if not yet in practice, a
coin-specific class.
Daemon
------
Used by the block processor, ElectrumX servers and prefetcher.
Encapsulates daemon RPC wire protcol. Logically, if not yet in
practice, a coin-specific class.
Block Processor
---------------
Responsible for managing block chain state (UTXO set, history,
transaction and undo information) and processing towards the chain
tip. Uses the caches for in-memory state caching. Flushes state to
the storage layer. Reponsible for handling block chain
reorganisations. Once caught up maintains a representation of daemon
mempool state.
Caches
------
The file system cache and the UTXO cache are implementation details of
the block processor, nothing else should interface with them.
Storage
-------
Backend database abstraction. Along with the host filesystem, used by
the block processor (and therefore its caches) to store chain state.
Prefetcher
----------
Used by the block processor to asynchronously prefetch blocks from the
daemon. Holds fetched block height. Once it has caught up
additionally obtains daemon mempool tx hashes. Serves blocks and
mempool hashes to the block processor via a queue.
IRC
---
Not currently imlpemented; will handle IRC communication for the
ElectrumX servers.
Controller
----------
A historical artefact that currently coordinates some of the above
components. Not pictured as it is doesn't seem to have a logical
place and so is probably going away.

282
server/block_processor.py

@ -1,6 +1,13 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Block prefetcher and chain processor.'''
import array
import ast
import asyncio
@ -15,7 +22,7 @@ from server.daemon import DaemonError
from lib.hash import hash_to_str
from lib.script import ScriptPubKey
from lib.util import chunks, LoggedClass
from server.storage import LMDB, RocksDB, LevelDB, NoDatabaseException
from server.storage import open_db
def formatted_time(t):
@ -42,16 +49,12 @@ class Prefetcher(LoggedClass):
self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue()
self.queue_size = 0
self.fetched_height = height
self.mempool = []
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
self.fetched_height = height
self.recent_sizes = [0]
async def get_blocks(self):
'''Returns a list of prefetched blocks.'''
blocks, total_size = await self.queue.get()
self.queue_size -= total_size
return blocks
# First fetch to be 10 blocks
self.ave_size = self.target_cache_size // 10
async def clear(self, height):
'''Clear prefetched blocks and restart from the given height.
@ -66,49 +69,73 @@ class Prefetcher(LoggedClass):
self.queue_size = 0
self.fetched_height = height
async def get_blocks(self):
'''Returns a list of prefetched blocks and the mempool.'''
blocks, height, size = await self.queue.get()
self.queue_size -= size
if height == self.daemon.cached_height():
return blocks, self.mempool
else:
return blocks, None
async def start(self):
'''Loop forever polling for more blocks.'''
self.logger.info('looping forever prefetching blocks...')
self.logger.info('starting prefetch loop...')
while True:
try:
with await self.semaphore:
count = await self._prefetch()
if not count:
if await self._caught_up():
await asyncio.sleep(5)
else:
await asyncio.sleep(0)
except DaemonError as e:
self.logger.info('ignoring daemon errors: {}'.format(e))
def _prefill_count(self, room):
ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
count = room // ave_size if ave_size else 0
return max(count, 10)
async def _caught_up(self):
'''Poll for new blocks and mempool state.
Mempool is only queried if caught up with daemon.'''
with await self.semaphore:
blocks, size = await self._prefetch()
self.fetched_height += len(blocks)
caught_up = self.fetched_height == self.daemon.cached_height()
if caught_up:
self.mempool = await self.daemon.mempool_hashes()
# Wake up block processor if we have something
if blocks or caught_up:
self.queue.put_nowait((blocks, self.fetched_height, size))
self.queue_size += size
return caught_up
async def _prefetch(self):
'''Prefetch blocks if there are any to prefetch.'''
'''Prefetch blocks unless the prefetch queue is full.'''
if self.queue_size >= self.target_cache_size:
return [], 0
daemon_height = await self.daemon.height()
max_count = min(daemon_height - self.fetched_height, 4000)
count = min(max_count, self._prefill_count(self.target_cache_size))
cache_room = self.target_cache_size // self.ave_size
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
if not count:
return 0
return [], 0
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if not hex_hashes:
self.logger.error('requested {:,d} hashes, got none'.format(count))
return 0
blocks = await self.daemon.raw_blocks(hex_hashes)
sizes = [len(block) for block in blocks]
total_size = sum(sizes)
self.queue.put_nowait((blocks, total_size))
self.queue_size += total_size
self.fetched_height += len(blocks)
# Keep 50 most recent block sizes for fetch count estimation
self.recent_sizes.extend(sizes)
excess = len(self.recent_sizes) - 50
if excess > 0:
self.recent_sizes = self.recent_sizes[excess:]
return count
size = sum(len(block) for block in blocks)
# Update our recent average block size estimate
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
return blocks, size
class BlockProcessor(LoggedClass):
@ -118,30 +145,33 @@ class BlockProcessor(LoggedClass):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon):
def __init__(self, env, daemon, on_update=None):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.
'''
super().__init__()
self.daemon = daemon
self.on_update = on_update
# Meta
self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.coin = env.coin
self.caught_up = False
self.reorg_limit = env.reorg_limit
# Chain state (initialize to genesis in case of new DB)
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
# Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin, env.db_engine)
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name))
self.init_state()
self.tx_count = self.db_tx_count
self.height = self.db_height
self.tip = self.db_tip
@ -150,7 +180,6 @@ class BlockProcessor(LoggedClass):
# entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.backup_hash168s = set()
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.prefetcher = Prefetcher(daemon, self.height)
@ -158,8 +187,9 @@ class BlockProcessor(LoggedClass):
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
# Redirected member func
# Redirected member funcs
self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
@ -187,36 +217,42 @@ class BlockProcessor(LoggedClass):
async def start(self):
'''External entry point for block processing.
A simple wrapper that safely flushes the DB on clean
shutdown.
Safely flushes the DB on clean shutdown.
'''
try:
await self.advance_blocks()
while True:
await self._wait_for_update()
await asyncio.sleep(0) # Yield
finally:
self.flush(True)
async def advance_blocks(self):
'''Loop forever processing blocks in the forward direction.'''
while True:
blocks = await self.prefetcher.get_blocks()
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks or a mempool update.
Blocks are only processed in the forward direction. The
prefetcher only provides a non-None mempool when caught up.
'''
all_touched = []
blocks, mempool = await self.prefetcher.get_blocks()
for block in blocks:
if not self.advance_block(block):
await self.handle_chain_reorg()
self.caught_up = False
touched = self.advance_block(block)
if touched is None:
all_touched.append(await self.handle_chain_reorg())
mempool = None
break
all_touched.append(touched)
await asyncio.sleep(0) # Yield
if self.height != self.daemon.cached_height():
continue
if not self.caught_up:
self.caught_up = True
self.logger.info('caught up to height {:,d}'
.format(self.height))
# Flush everything when in caught-up state as queries
# are performed on DB not in-memory
if mempool is not None:
# Caught up to daemon height. Flush everything as queries
# are performed on the DB and not in-memory.
self.flush(True)
if self.first_sync:
self.first_sync = False
self.logger.info('synced to height {:,d}'.format(self.height))
if self.on_update:
await self.on_update(self.height, set.union(*all_touched))
async def force_chain_reorg(self, to_genesis):
try:
@ -229,16 +265,21 @@ class BlockProcessor(LoggedClass):
self.logger.info('chain reorg detected')
self.flush(True)
self.logger.info('finding common height...')
touched = set()
hashes = await self.reorg_hashes(to_genesis)
# Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks)
touched.update(self.backup_blocks(blocks))
self.logger.info('backed up to height {:,d}'.format(self.height))
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
return touched
async def reorg_hashes(self, to_genesis):
'''Return the list of hashes to back up beacuse of a reorg.
@ -253,7 +294,6 @@ class BlockProcessor(LoggedClass):
start = self.height - 1
count = 1
while start > 0:
self.logger.info('start: {:,d} count: {:,d}'.format(start, count))
hashes = self.fs_cache.block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
@ -273,28 +313,17 @@ class BlockProcessor(LoggedClass):
return self.fs_cache.block_hashes(start, count)
def open_db(self, coin, db_engine):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
db_engine_class = {
"leveldb": LevelDB,
"rocksdb": RocksDB,
"lmdb": LMDB
}[db_engine.lower()]
try:
db = db_engine_class(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except NoDatabaseException:
db = db_engine_class(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
self.logger.info('created new {} database {}'.format(db_engine, db_name))
def init_state(self):
if self.db.is_new:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.flush_count = 0
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
self.logger.info('successfully opened {} database {}'.format(db_engine, db_name))
self.read_state(db)
return db
def read_state(self, db):
state = db.get(b'state')
state = self.db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise ChainError('DB genesis hash {} does not match coin {}'
@ -358,8 +387,6 @@ class BlockProcessor(LoggedClass):
def flush_state(self, batch):
'''Flush chain state to the batch.'''
if self.caught_up:
self.first_sync = False
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
@ -392,14 +419,13 @@ class BlockProcessor(LoggedClass):
assert not self.history
assert not self.utxo_cache.cache
assert not self.utxo_cache.db_cache
assert not self.backup_hash168s
def flush(self, flush_utxos=False):
def flush(self, flush_utxos=False, flush_history=None):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height:
self.logger.info('nothing to flush')
assert flush_history is None
self.assert_flushed()
return
@ -413,15 +439,14 @@ class BlockProcessor(LoggedClass):
# matter. But if writing the files fails we do not want to
# have updated the DB.
if self.height > self.db_height:
assert flush_history is None
flush_history = self.flush_history
self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last
# as it reads the wall time.
if self.height > self.db_height:
self.flush_history(batch)
else:
self.backup_history(batch)
flush_history(batch)
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
@ -457,7 +482,6 @@ class BlockProcessor(LoggedClass):
def flush_history(self, batch):
self.logger.info('flushing history')
assert not self.backup_hash168s
self.flush_count += 1
flush_id = struct.pack('>H', self.flush_count)
@ -472,20 +496,20 @@ class BlockProcessor(LoggedClass):
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def backup_history(self, batch):
def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
# Drop any NO_CACHE entry
self.backup_hash168s.discard(NO_CACHE_ENTRY)
hash168s.discard(NO_CACHE_ENTRY)
assert not self.history
nremoves = 0
for hash168 in sorted(self.backup_hash168s):
for hash168 in sorted(hash168s):
prefix = b'H' + hash168
deletes = []
puts = {}
for key, hist in self.db.iterator(reverse=True, prefix=prefix):
for key, hist in self.db.iterator(prefix=prefix, reverse=True):
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= self.tx_count
@ -502,8 +526,7 @@ class BlockProcessor(LoggedClass):
batch.put(key, value)
self.logger.info('removed {:,d} history entries from {:,d} addresses'
.format(nremoves, len(self.backup_hash168s)))
self.backup_hash168s = set()
.format(nremoves, len(hash168s)))
def cache_sizes(self):
'''Returns the approximate size of the cache, in MB.'''
@ -547,14 +570,15 @@ class BlockProcessor(LoggedClass):
# the UTXO cache uses the fs_cache via get_tx_hash() to
# resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
self.fs_cache.advance_block(header, tx_hashes, txs)
prev_hash, header_hash = self.coin.header_hashes(header)
if prev_hash != self.tip:
return False
return None
touched = set()
self.fs_cache.advance_block(header, tx_hashes, txs)
self.tip = header_hash
self.height += 1
undo_info = self.advance_txs(tx_hashes, txs)
undo_info = self.advance_txs(tx_hashes, txs, touched)
if self.daemon.cached_height() - self.height <= self.reorg_limit:
self.write_undo_info(self.height, b''.join(undo_info))
@ -566,9 +590,9 @@ class BlockProcessor(LoggedClass):
if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB:
self.flush(utxo_MB >= self.utxo_MB)
return True
return touched
def advance_txs(self, tx_hashes, txs):
def advance_txs(self, tx_hashes, txs, touched):
put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend
undo_info = []
@ -605,6 +629,7 @@ class BlockProcessor(LoggedClass):
for hash168 in hash168s:
history[hash168].append(tx_num)
self.history_size += len(hash168s)
touched.update(hash168s)
tx_num += 1
self.tx_count = tx_num
@ -620,6 +645,7 @@ class BlockProcessor(LoggedClass):
self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed()
touched = set()
for block in blocks:
header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
@ -628,15 +654,18 @@ class BlockProcessor(LoggedClass):
.format(hash_to_str(header_hash),
hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs)
self.backup_txs(tx_hashes, txs, touched)
self.fs_cache.backup_block()
self.tip = prev_hash
self.height -= 1
self.logger.info('backed up to height {:,d}'.format(self.height))
self.flush(True)
def backup_txs(self, tx_hashes, txs):
flush_history = partial(self.backup_history, hash168s=touched)
self.flush(True, flush_history=flush_history)
return touched
def backup_txs(self, tx_hashes, txs, touched):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
@ -646,7 +675,6 @@ class BlockProcessor(LoggedClass):
pack = struct.pack
put_utxo = self.utxo_cache.put
spend_utxo = self.utxo_cache.spend
hash168s = self.backup_hash168s
rtxs = reversed(txs)
rtx_hashes = reversed(tx_hashes)
@ -655,7 +683,7 @@ class BlockProcessor(LoggedClass):
# Spend the outputs
for idx, txout in enumerate(tx.outputs):
cache_value = spend_utxo(tx_hash, idx)
hash168s.add(cache_value[:21])
touched.add(cache_value[:21])
# Restore the inputs
if not tx.is_coinbase:
@ -664,7 +692,7 @@ class BlockProcessor(LoggedClass):
undo_item = undo_info[n:n + 33]
put_utxo(txin.prev_hash + pack('<H', txin.prev_idx),
undo_item)
hash168s.add(undo_item[:21])
touched.add(undo_item[:21])
assert n == 0
self.tx_count -= len(txs)
@ -724,6 +752,12 @@ class BlockProcessor(LoggedClass):
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def get_current_header(self):
'''Returns the current header as a dictionary.'''
return self.fs_cache.encode_header(self.height)
def get_utxo_hash168(self, tx_hash, index):
'''Returns the hash168 for a UTXO.'''
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed)
if hash168 == NO_CACHE_ENTRY:
hash168 = None
return hash168

38
server/cache.py

@ -1,6 +1,17 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''UTXO and file cache.
During initial sync these cache data and only flush occasionally.
Once synced flushes are performed after processing each block.
'''
import array
import itertools
import os
@ -98,7 +109,7 @@ class UTXOCache(LoggedClass):
return value
# Oh well. Find and remove it from the DB.
hash168 = self.hash168(prev_hash, idx_packed)
hash168 = self.hash168(prev_hash, idx_packed, True)
if not hash168:
return NO_CACHE_ENTRY
@ -133,7 +144,7 @@ class UTXOCache(LoggedClass):
raise Exception('could not resolve UTXO key collision')
def hash168(self, tx_hash, idx_packed):
def hash168(self, tx_hash, idx_packed, delete=False):
'''Return the hash168 paid to by the given TXO.
Refers to the database. Returns None if not found (which is
@ -147,6 +158,7 @@ class UTXOCache(LoggedClass):
return None
if len(data) == 25:
if delete:
self.cache_delete(key)
return data[:21]
@ -157,6 +169,7 @@ class UTXOCache(LoggedClass):
(tx_num, ) = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.parent.get_tx_hash(tx_num)
if my_hash == tx_hash:
if delete:
self.cache_write(key, data[:n] + data[n+25:])
return data[n:n+21]
@ -379,22 +392,3 @@ class FSCache(LoggedClass):
headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]
def encode_header(self, height):
if height < 0 or height > self.height + len(self.headers):
raise Exception('no header information for height {:,d}'
.format(height))
header = self.read_headers(self.height, 1)
unpack = struct.unpack
version, = unpack('<I', header[:4])
timestamp, bits, nonce = unpack('<III', header[68:80])
return {
'block_height': self.height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}

158
server/controller.py

@ -1,16 +1,25 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Server controller.
Coordinates the parts of the server. Serves as a cache for
client-serving data such as histories.
'''
import asyncio
import signal
import ssl
import traceback
from functools import partial
from server.daemon import Daemon, DaemonError
from server.daemon import Daemon
from server.block_processor import BlockProcessor
from server.protocol import ElectrumX, LocalRPC
from lib.hash import (sha256, double_sha256, hash_to_str,
Base58, hex_str_to_hash)
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.util import LoggedClass
@ -24,55 +33,71 @@ class Controller(LoggedClass):
super().__init__()
self.loop = loop
self.env = env
self.coin = env.coin
self.daemon = Daemon(env.daemon_url)
self.block_processor = BlockProcessor(env, self.daemon)
self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update)
JSONRPC.init(self.block_processor, self.daemon, self.coin,
self.add_job)
self.servers = []
self.sessions = set()
self.addresses = {}
self.jobs = set()
self.peers = {}
self.jobs = asyncio.Queue()
def start(self):
'''Prime the event loop with asynchronous servers and jobs.'''
env = self.env
loop = self.loop
'''Prime the event loop with asynchronous jobs.'''
coros = self.block_processor.coros()
if False:
self.start_servers()
coros.append(self.reap_jobs())
coros.append(self.run_jobs())
for coro in coros:
asyncio.ensure_future(coro)
# Signal handlers
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
self.loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def start_servers(self):
protocol = partial(LocalRPC, self)
async def on_update(self, height, touched):
if not self.servers:
self.servers = await self.start_servers()
ElectrumX.notify(height, touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified. Does
nothing if servers are already running.
'''
servers = []
env = self.env
loop = self.loop
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(loop.run_until_complete(rpc_server))
servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, self, self.daemon, env)
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(loop.run_until_complete(tcp_server))
servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
ssl_server = loop.create_server(protocol, env.host, env.ssl_port)
self.servers.append(loop.run_until_complete(ssl_server))
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
return servers
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
@ -85,81 +110,18 @@ class Controller(LoggedClass):
for task in asyncio.Task.all_tasks(self.loop):
task.cancel()
def add_session(self, session):
self.sessions.add(session)
def remove_session(self, session):
self.sessions.remove(session)
def add_job(self, coro):
'''Queue a job for asynchronous processing.'''
self.jobs.add(asyncio.ensure_future(coro))
self.jobs.put_nowait(coro)
async def reap_jobs(self):
async def run_jobs(self):
'''Asynchronously run through the job queue.'''
while True:
jobs = set()
for job in self.jobs:
if job.done():
job = await self.jobs.get()
try:
job.result()
except Exception as e:
await job
except asyncio.CancelledError:
raise
except Exception:
# Getting here should probably be considered a bug and fixed
traceback.print_exc()
else:
jobs.add(job)
self.logger.info('reaped {:d} jobs, {:d} jobs pending'
.format(len(self.jobs) - len(jobs), len(jobs)))
self.jobs = jobs
await asyncio.sleep(5)
def address_status(self, hash168):
'''Returns status as 32 bytes.'''
status = self.addresses.get(hash168)
if status is None:
history = self.block_processor.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
status = sha256(status.encode())
self.addresses[hash168] = status
return status
async def get_merkle(self, tx_hash, height):
'''tx_hash is a hex string.'''
block_hash = await self.daemon.send_single('getblockhash', (height,))
block = await self.daemon.send_single('getblock', (block_hash, True))
tx_hashes = block['tx']
# This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash)
idx = pos
hashes = [hex_str_to_hash(txh) for txh in tx_hashes]
merkle_branch = []
while len(hashes) > 1:
if len(hashes) & 1:
hashes.append(hashes[-1])
idx = idx - 1 if (idx & 1) else idx + 1
merkle_branch.append(hash_to_str(hashes[idx]))
idx //= 2
hashes = [double_sha256(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
def get_peers(self):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return self.peers
def height(self):
return self.block_processor.height
def get_current_header(self):
return self.block_processor.get_current_header()
def get_history(self, hash168):
history = self.block_processor.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]

33
server/daemon.py

@ -1,7 +1,11 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Classes for handling asynchronous connections to a blockchain
'''Class for handling asynchronous connections to a blockchain
daemon.'''
import asyncio
@ -67,8 +71,7 @@ class Daemon(LoggedClass):
msg = 'daemon still warming up.'
secs = 30
else:
msg = '{}'.format(errs)
raise DaemonError(msg)
raise DaemonError(errs)
self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs))
@ -86,6 +89,28 @@ class Daemon(LoggedClass):
# Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks]
async def mempool_hashes(self):
'''Return the hashes of the txs in the daemon's mempool.'''
return await self.send_single('getrawmempool')
async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.'''
return await self.send_single('estimatefee', params)
async def relayfee(self):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
net_info = await self.send_single('getnetworkinfo')
return net_info['relayfee']
async def getrawtransaction(self, hex_hash):
'''Return the serialized raw transaction with the given hash.'''
return await self.send_single('getrawtransaction', (hex_hash, 0))
async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.'''
return await self.send_single('sendrawtransaction', params)
async def height(self):
'''Query the daemon for its current height.'''
self._height = await self.send_single('getblockcount')

12
server/env.py

@ -1,6 +1,13 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Class for handling environment configuration and defaults.'''
from os import environ
from lib.coins import Coin
@ -27,6 +34,9 @@ class Env(LoggedClass):
# Server stuff
self.tcp_port = self.integer('TCP_PORT', None)
self.ssl_port = self.integer('SSL_PORT', None)
if self.ssl_port:
self.ssl_certfile = self.required('SSL_CERTFILE')
self.ssl_keyfile = self.required('SSL_KEYFILE')
self.rpc_port = self.integer('RPC_PORT', 8000)
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None)

525
server/protocol.py

@ -1,164 +1,449 @@
# See the file "LICENSE" for information about the copyright
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Classes for local RPC server and remote client TCP/SSL servers.'''
import asyncio
import codecs
import json
import struct
import traceback
from functools import partial
from server.daemon import DaemonError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass
from server.version import VERSION
class Error(Exception):
BAD_REQUEST = 1
INTERNAL_ERROR = 2
class RPCError(Exception):
'''RPC handlers raise this error.'''
def json_notification(method, params):
'''Create a json notification.'''
return {'id': None, 'method': method, 'params': params}
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Base class that manages a JSONRPC connection.'''
SESSIONS = set()
def __init__(self, controller):
def __init__(self):
super().__init__()
self.controller = controller
self.parts = []
self.send_count = 0
self.send_size = 0
self.error_count = 0
def connection_made(self, transport):
'''Handle an incoming client connection.'''
self.transport = transport
peername = transport.get_extra_info('peername')
self.logger.info('connection from {}'.format(peername))
self.controller.add_session(self)
self.peername = transport.get_extra_info('peername')
self.logger.info('connection from {}'.format(self.peername))
self.SESSIONS.add(self)
def connection_lost(self, exc):
self.logger.info('disconnected')
self.controller.remove_session(self)
'''Handle client disconnection.'''
self.logger.info('{} disconnected. '
'Sent {:,d} bytes in {:,d} messages {:,d} errors'
.format(self.peername, self.send_size,
self.send_count, self.error_count))
self.SESSIONS.remove(self)
def data_received(self, data):
'''Handle incoming data (synchronously).
Requests end in newline characters. Pass complete requests to
decode_message for handling.
'''
while True:
npos = data.find(ord('\n'))
if npos == -1:
self.parts.append(data)
break
tail, data = data[:npos], data[npos + 1:]
parts = self.parts
self.parts = []
parts, self.parts = self.parts, []
parts.append(tail)
self.decode_message(b''.join(parts))
if data:
self.parts.append(data)
def decode_message(self, message):
'''Message is a binary message.'''
'''Decode a binary message and queue it for asynchronous handling.'''
try:
message = json.loads(message.decode())
except Exception as e:
self.logger.info('caught exception decoding message'.format(e))
return
job = self.request_handler(message)
self.controller.add_job(job)
self.logger.info('error decoding JSON message'.format(e))
else:
self.ADD_JOB(self.request_handler(message))
async def request_handler(self, request):
'''Called asynchronously.'''
error = result = None
try:
result = await self.json_handler(request)
except Error as e:
error = {'code': e.args[0], 'message': e.args[1]}
except asyncio.CancelledError:
raise
except Exception as e:
# This should be considered a bug and fixed
traceback.print_exc()
error = {'code': Error.INTERNAL_ERROR, 'message': str(e)}
handler = self.rpc_handler(request.get('method'),
request.get('params', []))
result = await handler()
except RPCError as e:
self.error_count += 1
error = {'code': 1, 'message': e.args[0]}
payload = {'id': request.get('id'), 'error': error, 'result': result}
try:
data = json.dumps(payload) + '\n'
except TypeError:
msg = 'cannot JSON encode response to request {}'.format(request)
self.logger.error(msg)
error = {'code': Error.INTERNAL_ERROR, 'message': msg}
payload = {'id': request.get('id'), 'error': error, 'result': None}
data = json.dumps(payload) + '\n'
self.transport.write(data.encode())
async def json_handler(self, request):
method = request.get('method')
self.json_send(payload)
def json_send(self, payload):
data = (json.dumps(payload) + '\n').encode()
self.transport.write(data)
self.send_count += 1
self.send_size += len(data)
def rpc_handler(self, method, params):
handler = None
if isinstance(method, str):
handler_name = 'handle_{}'.format(method.replace('.', '_'))
handler = getattr(self, handler_name, None)
handler = self.handlers.get(method)
if not handler:
self.logger.info('unknown method: {}'.format(method))
raise Error(Error.BAD_REQUEST, 'unknown method: {}'.format(method))
params = request.get('params', [])
if not isinstance(params, list):
raise Error(Error.BAD_REQUEST, 'params should be an array')
return await handler(params)
raise RPCError('unknown method: {}'.format(method))
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
if not isinstance(params, list):
raise RPCError('params should be an array')
def __init__(self, controller, daemon, env):
super().__init__(controller)
self.daemon = daemon
self.env = env
self.addresses = set()
self.subscribe_headers = False
return partial(handler, self, params)
def params_to_hash168(self, params):
if len(params) != 1:
raise Error(Error.BAD_REQUEST,
'params should contain a single address')
address = params[0]
@classmethod
def tx_hash_from_param(cls, param):
'''Raise an RPCError if the parameter is not a valid transaction
hash.'''
if isinstance(param, str) and len(param) == 64:
try:
return self.env.coin.address_to_hash168(address)
bytes.fromhex(param)
return param
except ValueError:
pass
raise RPCError('parameter should be a transaction hash: {}'
.format(param))
@classmethod
def hash168_from_param(cls, param):
if isinstance(param, str):
try:
return cls.COIN.address_to_hash168(param)
except:
raise Error(Error.BAD_REQUEST,
'invalid address: {}'.format(address))
pass
raise RPCError('parameter should be a valid address: {}'.format(param))
async def handle_blockchain_address_get_history(self, params):
hash168 = self.params_to_hash168(params)
return self.controller.get_history(hash168)
@classmethod
def non_negative_integer_from_param(cls, param):
try:
param = int(param)
except ValueError:
pass
else:
if param >= 0:
return param
raise RPCError('param should be a non-negative integer: {}'
.format(param))
@classmethod
def extract_hash168(cls, params):
if len(params) == 1:
return cls.hash168_from_param(params[0])
raise RPCError('params should contain a single address: {}'
.format(params))
@classmethod
def extract_non_negative_integer(cls, params):
if len(params) == 1:
return cls.non_negative_integer_from_param(params[0])
raise RPCError('params should contain a non-negative integer: {}'
.format(params))
@classmethod
def require_empty_params(cls, params):
if params:
raise RPCError('params should be empty: {}'.format(params))
@classmethod
def init(cls, block_processor, daemon, coin, add_job):
cls.BLOCK_PROCESSOR = block_processor
cls.DAEMON = daemon
cls.COIN = coin
cls.ADD_JOB = add_job
@classmethod
def height(cls):
'''Return the current height.'''
return cls.BLOCK_PROCESSOR.height
@classmethod
def electrum_header(cls, height=None):
'''Return the binary header at the given height.'''
if not 0 <= height <= cls.height():
raise RPCError('height {:,d} out of range'.format(height))
header = cls.BLOCK_PROCESSOR.read_headers(height, 1)
return cls.COIN.electrum_header(header, height)
@classmethod
def current_electrum_header(cls):
'''Used as response to a headers subscription request.'''
return cls.electrum_header(cls.height())
async def handle_blockchain_address_subscribe(self, params):
hash168 = self.params_to_hash168(params)
status = self.controller.address_status(hash168)
return status.hex() if status else None
async def handle_blockchain_estimatefee(self, params):
result = await self.daemon.send_single('estimatefee', params)
return result
class ElectrumX(JSONRPC):
'''A TCP server that handles incoming Electrum connections.'''
async def handle_blockchain_headers_subscribe(self, params):
def __init__(self, env):
super().__init__()
self.env = env
self.hash168s = set()
self.subscribe_headers = False
self.subscribe_height = False
self.notified_height = None
rpcs = [
('blockchain',
'address.get_balance address.get_history address.get_mempool '
'address.get_proof address.listunspent address.subscribe '
'block.get_header block.get_chunk estimatefee headers.subscribe '
'numblocks.subscribe relayfee transaction.broadcast '
'transaction.get transaction.get_merkle utxo.get_address'),
('server',
'banner donation_address peers.subscribe version'),
]
self.handlers = {'.'.join([prefix, suffix]):
getattr(self.__class__, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
@classmethod
def watched_address_count(cls):
return sum(len(session.hash168s) for session in self.SESSIONS
if isinstance(session, cls))
@classmethod
def notify(cls, height, touched):
'''Notify electrum clients about height changes and touched
addresses.'''
headers_payload = json_notification(
'blockchain.headers.subscribe',
(cls.electrum_header(height), ),
)
height_payload = json_notification(
'blockchain.numblocks.subscribe',
(height, ),
)
hash168_to_address = cls.COIN.hash168_to_address
for session in cls.SESSIONS:
if height != session.notified_height:
session.notified_height = height
if session.subscribe_headers:
session.json_send(headers_payload)
if session.subscribe_height:
session.json_send(height_payload)
for hash168 in session.hash168s.intersection(touched):
address = hash168_to_address(hash168)
status = cls.address_status(hash168)
payload = json_notification('blockchain.address.subscribe',
(address, status))
session.json_send(payload)
@classmethod
def address_status(cls, hash168):
'''Returns status as 32 bytes.'''
history = cls.BLOCK_PROCESSOR.get_history(hash168)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
if status:
return sha256(status.encode()).hex()
return None
@classmethod
async def tx_merkle(cls, tx_hash, height):
'''tx_hash is a hex string.'''
block_hash = await cls.DAEMON.send_single('getblockhash', (height,))
block = await cls.DAEMON.send_single('getblock', (block_hash, True))
tx_hashes = block['tx']
# This will throw if the tx_hash is bad
pos = tx_hashes.index(tx_hash)
idx = pos
hashes = [hex_str_to_hash(txh) for txh in tx_hashes]
merkle_branch = []
while len(hashes) > 1:
if len(hashes) & 1:
hashes.append(hashes[-1])
idx = idx - 1 if (idx & 1) else idx + 1
merkle_branch.append(hash_to_str(hashes[idx]))
idx //= 2
hashes = [double_sha256(hashes[n] + hashes[n + 1])
for n in range(0, len(hashes), 2)]
return {"block_height": height, "merkle": merkle_branch, "pos": pos}
@classmethod
def irc_peers(cls):
'''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one
per peer.'''
return {}
@classmethod
def height(cls):
return cls.BLOCK_PROCESSOR.height
@classmethod
def get_history(cls, hash168):
history = cls.BLOCK_PROCESSOR.get_history(hash168, limit=None)
return [
{'tx_hash': hash_to_str(tx_hash), 'height': height}
for tx_hash, height in history
]
@classmethod
def get_chunk(cls, index):
'''Return header chunk as hex. Index is a non-negative integer.'''
chunk_size = cls.COIN.CHUNK_SIZE
next_height = cls.height() + 1
start_height = min(index * chunk_size, next_height)
count = min(next_height - start_height, chunk_size)
return cls.BLOCK_PROCESSOR.read_headers(start_height, count).hex()
@classmethod
def get_balance(cls, hash168):
confirmed = cls.BLOCK_PROCESSOR.get_balance(hash168)
unconfirmed = -1 # FIXME
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
@classmethod
def list_unspent(cls, hash168):
utxos = cls.BLOCK_PROCESSOR.get_utxos_sorted(hash168)
return tuple({'tx_hash': hash_to_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, 'height': utxo.height,
'value': utxo.value}
for utxo in utxos)
# --- blockchain commands
async def address_get_balance(self, params):
hash168 = self.extract_hash168(params)
return self.get_balance(hash168)
async def address_get_history(self, params):
hash168 = self.extract_hash168(params)
return self.get_history(hash168)
async def address_get_mempool(self, params):
hash168 = self.extract_hash168(params)
raise RPCError('get_mempool is not yet implemented')
async def address_get_proof(self, params):
hash168 = self.extract_hash168(params)
raise RPCError('get_proof is not yet implemented')
async def address_listunspent(self, params):
hash168 = self.extract_hash168(params)
return self.list_unspent(hash168)
async def address_subscribe(self, params):
hash168 = self.extract_hash168(params)
self.hash168s.add(hash168)
return self.address_status(hash168)
async def block_get_chunk(self, params):
index = self.extract_non_negative_integer(params)
return self.get_chunk(index)
async def block_get_header(self, params):
height = self.extract_non_negative_integer(params)
return self.electrum_header(height)
async def estimatefee(self, params):
return await self.DAEMON.estimatefee(params)
async def headers_subscribe(self, params):
self.require_empty_params(params)
self.subscribe_headers = True
return self.controller.get_current_header()
return self.current_electrum_header()
async def handle_blockchain_relayfee(self, params):
async def numblocks_subscribe(self, params):
self.require_empty_params(params)
self.subscribe_height = True
return self.height()
async def relayfee(self, params):
'''The minimum fee a low-priority tx must pay in order to be accepted
to this daemon's memory pool.
to the daemon's memory pool.'''
self.require_empty_params(params)
return await self.DAEMON.relayfee()
async def transaction_broadcast(self, params):
'''Pass through the parameters to the daemon.
An ugly API: current Electrum clients only pass the raw
transaction in hex and expect error messages to be returned in
the result field. And the server shouldn't be doing the client's
user interface job here.
'''
net_info = await self.daemon.send_single('getnetworkinfo')
return net_info['relayfee']
async def handle_blockchain_transaction_get(self, params):
if len(params) != 1:
raise Error(Error.BAD_REQUEST,
'params should contain a transaction hash')
tx_hash = params[0]
return await self.daemon.send_single('getrawtransaction', (tx_hash, 0))
async def handle_blockchain_transaction_get_merkle(self, params):
if len(params) != 2:
raise Error(Error.BAD_REQUEST,
'params should contain a transaction hash and height')
tx_hash, height = params
return await self.controller.get_merkle(tx_hash, height)
async def handle_server_banner(self, params):
try:
tx_hash = await self.DAEMON.sendrawtransaction(params)
self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash
except DaemonError as e:
errors = e.args[0]
error = errors[0]
message = error['message']
self.logger.info('sendrawtransaction: {}'.format(message))
if 'non-mandatory-script-verify-flag' in message:
return (
'Your client produced a transaction that is not accepted '
'by the network any more. Please upgrade to Electrum '
'2.5.1 or newer.'
)
return (
'The transaction was rejected by network rules. ({})\n[{}]'
.format(message, params[0])
)
async def transaction_get(self, params):
'''Return the serialized raw transaction.'''
# For some reason Electrum passes a height. Don't require it
# in anticipation it might be dropped in the future.
if 1 <= len(params) <= 2:
tx_hash = self.tx_hash_from_param(params[0])
return await self.DAEMON.getrawtransaction(tx_hash)
raise RPCError('params wrong length: {}'.format(params))
async def transaction_get_merkle(self, params):
if len(params) == 2:
tx_hash = self.tx_hash_from_param(params[0])
height = self.non_negative_integer_from_param(params[1])
return await self.tx_merkle(tx_hash, height)
raise RPCError('params should contain a transaction hash and height')
async def utxo_get_address(self, params):
if len(params) == 2:
tx_hash = self.tx_hash_from_param(params[0])
index = self.non_negative_integer_from_param(params[1])
tx_hash = hex_str_to_hash(tx_hash)
hash168 = self.BLOCK_PROCESSOR.get_utxo_hash168(tx_hash, index)
if hash168:
return self.COIN.hash168_to_address(hash168)
return None
raise RPCError('params should contain a transaction hash and index')
# --- server commands
async def banner(self, params):
'''Return the server banner.'''
self.require_empty_params(params)
banner = 'Welcome to Electrum!'
if self.env.banner_file:
try:
@ -169,23 +454,25 @@ class ElectrumX(JSONRPC):
.format(self.env.banner_file, e))
return banner
async def handle_server_donation_address(self, params):
async def donation_address(self, params):
'''Return the donation address as a string.
If none is specified return the empty string.
'''
self.require_empty_params(params)
return self.env.donation_address
async def handle_server_peers_subscribe(self, params):
async def peers_subscribe(self, params):
'''Returns the peer (ip, host, ports) tuples.
Despite the name electrum-server does not treat this as a
subscription.
'''
peers = self.controller.get_peers()
self.require_empty_params(params)
peers = ElectrumX.irc_peers()
return tuple(peers.values())
async def handle_server_version(self, params):
async def version(self, params):
'''Return the server version as a string.'''
return VERSION
@ -193,24 +480,28 @@ class ElectrumX(JSONRPC):
class LocalRPC(JSONRPC):
'''A local TCP RPC server for querying status.'''
async def handle_getinfo(self, params):
def __init__(self):
super().__init__()
cmds = 'getinfo sessions numsessions peers numpeers'.split()
self.handlers = {cmd: getattr(self.__class__, cmd) for cmd in cmds}
async def getinfo(self, params):
return {
'blocks': self.controller.height(),
'peers': len(self.controller.get_peers()),
'sessions': len(self.controller.sessions),
'watched': sum(len(s.addresses) for s in self.controller.sessions
if isinstance(s, ElectrumX)),
'blocks': self.height(),
'peers': len(ElectrumX.irc_peers()),
'sessions': len(self.SESSIONS),
'watched': ElectrumX.watched_address_count(),
'cached': 0,
}
async def handle_sessions(self, params):
async def sessions(self, params):
return []
async def handle_numsessions(self, params):
return len(self.controller.sessions)
async def numsessions(self, params):
return len(self.SESSIONS)
async def handle_peers(self, params):
return tuple(self.controller.get_peers().keys())
async def peers(self, params):
return tuple(ElectrumX.irc_peers().keys())
async def handle_numpeers(self, params):
return len(self.controller.get_peers())
async def numpeers(self, params):
return len(ElectrumX.irc_peers())

158
server/storage.py

@ -1,77 +1,112 @@
# Copyright (c) 2016, the ElectrumX authors
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Backend database abstraction.
The abstraction needs to be improved to not heavily penalise LMDB.
'''
import os
from functools import partial
from lib.util import subclasses
def open_db(name, db_engine):
'''Returns a database handle.'''
for db_class in subclasses(Storage):
if db_class.__name__.lower() == db_engine.lower():
db_class.import_module()
return db_class(name)
raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine))
class Storage(object):
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
if not create_if_missing and not os.path.exists(name):
raise NoDatabaseException
'''Abstract base class of the DB backend abstraction.'''
def __init__(self, name):
self.is_new = not os.path.exists(name)
self.open(name, create=self.is_new)
@classmethod
def import_module(cls):
'''Import the DB engine module.'''
raise NotImplementedError
def open(self, name, create):
'''Open an existing database or create a new one.'''
raise NotImplementedError
def get(self, key):
raise NotImplementedError()
raise NotImplementedError
def put(self, key, value):
raise NotImplementedError()
raise NotImplementedError
def write_batch(self):
"""
Returns a context manager that provides `put` and `delete`.
Changes should only be committed when the context manager closes without an exception.
"""
raise NotImplementedError()
'''Return a context manager that provides `put` and `delete`.
def iterator(self, prefix=b""):
"""
Returns an iterator that yields (key, value) pairs from the database sorted by key.
If `prefix` is set, only keys starting with `prefix` will be included.
"""
raise NotImplementedError()
Changes should only be committed when the context manager
closes without an exception.
'''
raise NotImplementedError
def iterator(self, prefix=b'', reverse=False):
'''Return an iterator that yields (key, value) pairs from the
database sorted by key.
class NoDatabaseException(Exception):
pass
If `prefix` is set, only keys starting with `prefix` will be
included. If `reverse` is True the items are returned in
reverse order.
'''
raise NotImplementedError
class LevelDB(Storage):
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
super().__init__(name, create_if_missing, error_if_exists, compression)
import plyvel
self.db = plyvel.DB(name, create_if_missing=create_if_missing,
error_if_exists=error_if_exists, compression=compression)
'''LevelDB database engine.'''
def get(self, key):
return self.db.get(key)
def write_batch(self):
return self.db.write_batch(transaction=True)
def iterator(self, prefix=b""):
return self.db.iterator(prefix=prefix)
@classmethod
def import_module(cls):
import plyvel
cls.module = plyvel
def put(self, key, value):
self.db.put(key, value)
def open(self, name, create):
self.db = self.module.DB(name, create_if_missing=create,
compression=None)
self.get = self.db.get
self.put = self.db.put
self.iterator = self.db.iterator
self.write_batch = partial(self.db.write_batch, transaction=True)
class RocksDB(Storage):
rocksdb = None
'''RocksDB database engine.'''
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
super().__init__(name, create_if_missing, error_if_exists, compression)
@classmethod
def import_module(cls):
import rocksdb
RocksDB.rocksdb = rocksdb
if not compression:
cls.module = rocksdb
def open(self, name, create):
compression = "no"
compression = getattr(rocksdb.CompressionType, compression + "_compression")
self.db = rocksdb.DB(name, rocksdb.Options(create_if_missing=create_if_missing,
compression = getattr(self.module.CompressionType,
compression + "_compression")
options = self.module.Options(create_if_missing=create,
compression=compression,
target_file_size_base=33554432,
max_open_files=1024))
def get(self, key):
return self.db.get(key)
max_open_files=1024)
self.db = self.module.DB(name, options)
self.get = self.db.get
self.put = self.db.put
class WriteBatch(object):
def __init__(self, db):
self.batch = RocksDB.rocksdb.WriteBatch()
self.batch = RocksDB.module.WriteBatch()
self.db = db
def __enter__(self):
@ -85,8 +120,10 @@ class RocksDB(Storage):
return RocksDB.WriteBatch(self.db)
class Iterator(object):
def __init__(self, db, prefix):
def __init__(self, db, prefix, reverse):
self.it = db.iteritems()
if reverse:
self.it = reversed(self.it)
self.prefix = prefix
def __iter__(self):
@ -100,22 +137,22 @@ class RocksDB(Storage):
raise StopIteration
return k, v
def iterator(self, prefix=b""):
return RocksDB.Iterator(self.db, prefix)
def put(self, key, value):
return self.db.put(key, value)
def iterator(self, prefix=b'', reverse=False):
return RocksDB.Iterator(self.db, prefix, reverse)
class LMDB(Storage):
lmdb = None
'''RocksDB database engine.'''
def __init__(self, name, create_if_missing=False, error_if_exists=False, compression=None):
super().__init__(name, create_if_missing, error_if_exists, compression)
@classmethod
def import_module(cls):
import lmdb
LMDB.lmdb = lmdb
self.env = lmdb.Environment(".", subdir=True, create=create_if_missing, max_dbs=32, map_size=5 * 10 ** 10)
self.db = self.env.open_db(create=create_if_missing)
cls.module = lmdb
def open(self, name, create):
self.env = cls.module.Environment('.', subdir=True, create=create,
max_dbs=32, map_size=5 * 10 ** 10)
self.db = self.env.open_db(create=create)
def get(self, key):
with self.env.begin(db=self.db) as tx:
@ -128,15 +165,16 @@ class LMDB(Storage):
def write_batch(self):
return self.env.begin(db=self.db, write=True)
def iterator(self, prefix=b""):
return LMDB.lmdb.Iterator(self.db, self.env, prefix)
def iterator(self, prefix=b'', reverse=False):
return LMDB.Iterator(self.db, self.env, prefix, reverse)
class Iterator:
def __init__(self, db, env, prefix):
def __init__(self, db, env, prefix, reverse):
self.transaction = env.begin(db=db)
self.transaction.__enter__()
self.db = db
self.prefix = prefix
self.reverse = reverse # FIXME
def __iter__(self):
self.iterator = LMDB.lmdb.Cursor(self.db, self.transaction)

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.03"
VERSION = "ElectrumX 0.04"

11
server_main.py

@ -1,8 +1,15 @@
#!/usr/bin/env python3
# See the file "LICENSE" for information about the copyright
#
# Copyright (c) 2016, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Script to kick off the server.'''
import asyncio
import logging
import os

Loading…
Cancel
Save