You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
714 lines
26 KiB
714 lines
26 KiB
import json
|
|
import threading
|
|
import time
|
|
import os
|
|
import stat
|
|
import ssl
|
|
from decimal import Decimal
|
|
from typing import Union, Optional, Dict, Sequence, Tuple
|
|
from numbers import Real
|
|
|
|
from copy import deepcopy
|
|
from aiorpcx import NetAddress
|
|
|
|
from . import util
|
|
from . import constants
|
|
from .util import base_units, base_unit_name_to_decimal_point, decimal_point_to_base_unit_name, UnknownBaseUnit, DECIMAL_POINT_DEFAULT
|
|
from .util import format_satoshis, format_fee_satoshis
|
|
from .util import user_dir, make_dir, NoDynamicFeeEstimates, quantize_feerate
|
|
from .i18n import _
|
|
from .logging import get_logger, Logger
|
|
|
|
|
|
FEE_ETA_TARGETS = [25, 10, 5, 2]
|
|
FEE_DEPTH_TARGETS = [10000000, 5000000, 2000000, 1000000, 500000, 200000, 100000]
|
|
FEE_LN_ETA_TARGET = 2 # note: make sure the network is asking for estimates for this target
|
|
|
|
# satoshi per kbyte
|
|
FEERATE_MAX_DYNAMIC = 1500000
|
|
FEERATE_WARNING_HIGH_FEE = 600000
|
|
FEERATE_FALLBACK_STATIC_FEE = 150000
|
|
FEERATE_DEFAULT_RELAY = 1000
|
|
FEERATE_MAX_RELAY = 50000
|
|
FEERATE_STATIC_VALUES = [1000, 2000, 5000, 10000, 20000, 30000,
|
|
50000, 70000, 100000, 150000, 200000, 300000]
|
|
FEERATE_REGTEST_HARDCODED = 180000 # for eclair compat
|
|
|
|
# The min feerate_per_kw that can be used in lightning so that
|
|
# the resulting onchain tx pays the min relay fee.
|
|
# This would be FEERATE_DEFAULT_RELAY / 4 if not for rounding errors,
|
|
# see https://github.com/ElementsProject/lightning/commit/2e687b9b352c9092b5e8bd4a688916ac50b44af0
|
|
FEERATE_PER_KW_MIN_RELAY_LIGHTNING = 253
|
|
|
|
FEE_RATIO_HIGH_WARNING = 0.05 # warn user if fee/amount for on-chain tx is higher than this
|
|
|
|
|
|
_logger = get_logger(__name__)
|
|
|
|
|
|
FINAL_CONFIG_VERSION = 3
|
|
|
|
|
|
class SimpleConfig(Logger):
|
|
"""
|
|
The SimpleConfig class is responsible for handling operations involving
|
|
configuration files.
|
|
|
|
There are two different sources of possible configuration values:
|
|
1. Command line options.
|
|
2. User configuration (in the user's config directory)
|
|
They are taken in order (1. overrides config options set in 2.)
|
|
"""
|
|
|
|
def __init__(self, options=None, read_user_config_function=None,
|
|
read_user_dir_function=None):
|
|
if options is None:
|
|
options = {}
|
|
|
|
Logger.__init__(self)
|
|
|
|
# This lock needs to be acquired for updating and reading the config in
|
|
# a thread-safe way.
|
|
self.lock = threading.RLock()
|
|
|
|
self.mempool_fees = None # type: Optional[Sequence[Tuple[Union[float, int], int]]]
|
|
self.fee_estimates = {} # type: Dict[int, int]
|
|
self.last_time_fee_estimates_requested = 0 # zero ensures immediate fees
|
|
|
|
# The following two functions are there for dependency injection when
|
|
# testing.
|
|
if read_user_config_function is None:
|
|
read_user_config_function = read_user_config
|
|
if read_user_dir_function is None:
|
|
self.user_dir = user_dir
|
|
else:
|
|
self.user_dir = read_user_dir_function
|
|
|
|
# The command line options
|
|
self.cmdline_options = deepcopy(options)
|
|
# don't allow to be set on CLI:
|
|
self.cmdline_options.pop('config_version', None)
|
|
|
|
# Set self.path and read the user config
|
|
self.user_config = {} # for self.get in electrum_path()
|
|
self.path = self.electrum_path()
|
|
self.user_config = read_user_config_function(self.path)
|
|
if not self.user_config:
|
|
# avoid new config getting upgraded
|
|
self.user_config = {'config_version': FINAL_CONFIG_VERSION}
|
|
|
|
self._not_modifiable_keys = set()
|
|
|
|
# config "upgrade" - CLI options
|
|
self.rename_config_keys(
|
|
self.cmdline_options, {'auto_cycle': 'auto_connect'}, True)
|
|
|
|
# config upgrade - user config
|
|
if self.requires_upgrade():
|
|
self.upgrade()
|
|
|
|
self._check_dependent_keys()
|
|
|
|
# units and formatting
|
|
self.decimal_point = self.get('decimal_point', DECIMAL_POINT_DEFAULT)
|
|
try:
|
|
decimal_point_to_base_unit_name(self.decimal_point)
|
|
except UnknownBaseUnit:
|
|
self.decimal_point = DECIMAL_POINT_DEFAULT
|
|
self.num_zeros = int(self.get('num_zeros', 0))
|
|
self.amt_precision_post_satoshi = int(self.get('amt_precision_post_satoshi', 0))
|
|
self.amt_add_thousands_sep = bool(self.get('amt_add_thousands_sep', False))
|
|
|
|
def electrum_path(self):
|
|
# Read electrum_path from command line
|
|
# Otherwise use the user's default data directory.
|
|
path = self.get('electrum_path')
|
|
if path is None:
|
|
path = self.user_dir()
|
|
|
|
make_dir(path, allow_symlink=False)
|
|
if self.get('testnet'):
|
|
path = os.path.join(path, 'testnet')
|
|
make_dir(path, allow_symlink=False)
|
|
elif self.get('regtest'):
|
|
path = os.path.join(path, 'regtest')
|
|
make_dir(path, allow_symlink=False)
|
|
elif self.get('simnet'):
|
|
path = os.path.join(path, 'simnet')
|
|
make_dir(path, allow_symlink=False)
|
|
elif self.get('signet'):
|
|
path = os.path.join(path, 'signet')
|
|
make_dir(path, allow_symlink=False)
|
|
|
|
self.logger.info(f"electrum directory {path}")
|
|
return path
|
|
|
|
def rename_config_keys(self, config, keypairs, deprecation_warning=False):
|
|
"""Migrate old key names to new ones"""
|
|
updated = False
|
|
for old_key, new_key in keypairs.items():
|
|
if old_key in config:
|
|
if new_key not in config:
|
|
config[new_key] = config[old_key]
|
|
if deprecation_warning:
|
|
self.logger.warning('Note that the {} variable has been deprecated. '
|
|
'You should use {} instead.'.format(old_key, new_key))
|
|
del config[old_key]
|
|
updated = True
|
|
return updated
|
|
|
|
def set_key(self, key, value, save=True):
|
|
if not self.is_modifiable(key):
|
|
self.logger.warning(f"not changing config key '{key}' set on the command line")
|
|
return
|
|
try:
|
|
json.dumps(key)
|
|
json.dumps(value)
|
|
except:
|
|
self.logger.info(f"json error: cannot save {repr(key)} ({repr(value)})")
|
|
return
|
|
self._set_key_in_user_config(key, value, save)
|
|
|
|
def _set_key_in_user_config(self, key, value, save=True):
|
|
with self.lock:
|
|
if value is not None:
|
|
self.user_config[key] = value
|
|
else:
|
|
self.user_config.pop(key, None)
|
|
if save:
|
|
self.save_user_config()
|
|
|
|
def get(self, key, default=None):
|
|
with self.lock:
|
|
out = self.cmdline_options.get(key)
|
|
if out is None:
|
|
out = self.user_config.get(key, default)
|
|
return out
|
|
|
|
def _check_dependent_keys(self) -> None:
|
|
if self.get('serverfingerprint'):
|
|
if not self.get('server'):
|
|
raise Exception("config key 'serverfingerprint' requires 'server' to also be set")
|
|
self.make_key_not_modifiable('server')
|
|
|
|
def requires_upgrade(self):
|
|
return self.get_config_version() < FINAL_CONFIG_VERSION
|
|
|
|
def upgrade(self):
|
|
with self.lock:
|
|
self.logger.info('upgrading config')
|
|
|
|
self.convert_version_2()
|
|
self.convert_version_3()
|
|
|
|
self.set_key('config_version', FINAL_CONFIG_VERSION, save=True)
|
|
|
|
def convert_version_2(self):
|
|
if not self._is_upgrade_method_needed(1, 1):
|
|
return
|
|
|
|
self.rename_config_keys(self.user_config, {'auto_cycle': 'auto_connect'})
|
|
|
|
try:
|
|
# change server string FROM host:port:proto TO host:port:s
|
|
server_str = self.user_config.get('server')
|
|
host, port, protocol = str(server_str).rsplit(':', 2)
|
|
assert protocol in ('s', 't')
|
|
int(port) # Throw if cannot be converted to int
|
|
server_str = '{}:{}:s'.format(host, port)
|
|
self._set_key_in_user_config('server', server_str)
|
|
except BaseException:
|
|
self._set_key_in_user_config('server', None)
|
|
|
|
self.set_key('config_version', 2)
|
|
|
|
def convert_version_3(self):
|
|
if not self._is_upgrade_method_needed(2, 2):
|
|
return
|
|
|
|
base_unit = self.user_config.get('base_unit')
|
|
if isinstance(base_unit, str):
|
|
self._set_key_in_user_config('base_unit', None)
|
|
map_ = {'btc':8, 'mbtc':5, 'ubtc':2, 'bits':2, 'sat':0}
|
|
decimal_point = map_.get(base_unit.lower())
|
|
self._set_key_in_user_config('decimal_point', decimal_point)
|
|
|
|
self.set_key('config_version', 3)
|
|
|
|
def _is_upgrade_method_needed(self, min_version, max_version):
|
|
cur_version = self.get_config_version()
|
|
if cur_version > max_version:
|
|
return False
|
|
elif cur_version < min_version:
|
|
raise Exception(
|
|
('config upgrade: unexpected version %d (should be %d-%d)'
|
|
% (cur_version, min_version, max_version)))
|
|
else:
|
|
return True
|
|
|
|
def get_config_version(self):
|
|
config_version = self.get('config_version', 1)
|
|
if config_version > FINAL_CONFIG_VERSION:
|
|
self.logger.warning('config version ({}) is higher than latest ({})'
|
|
.format(config_version, FINAL_CONFIG_VERSION))
|
|
return config_version
|
|
|
|
def is_modifiable(self, key) -> bool:
|
|
return (key not in self.cmdline_options
|
|
and key not in self._not_modifiable_keys)
|
|
|
|
def make_key_not_modifiable(self, key) -> None:
|
|
self._not_modifiable_keys.add(key)
|
|
|
|
def save_user_config(self):
|
|
if self.get('forget_config'):
|
|
return
|
|
if not self.path:
|
|
return
|
|
path = os.path.join(self.path, "config")
|
|
s = json.dumps(self.user_config, indent=4, sort_keys=True)
|
|
try:
|
|
with open(path, "w", encoding='utf-8') as f:
|
|
f.write(s)
|
|
os.chmod(path, stat.S_IREAD | stat.S_IWRITE)
|
|
except FileNotFoundError:
|
|
# datadir probably deleted while running...
|
|
if os.path.exists(self.path): # or maybe not?
|
|
raise
|
|
|
|
def get_backup_dir(self):
|
|
# this is used to save a backup everytime a channel is created
|
|
# on Android, the export backup button uses android_backup_dir()
|
|
if 'ANDROID_DATA' in os.environ:
|
|
return None
|
|
else:
|
|
return self.get('backup_dir')
|
|
|
|
def get_wallet_path(self, *, use_gui_last_wallet=False):
|
|
"""Set the path of the wallet."""
|
|
|
|
# command line -w option
|
|
if self.get('wallet_path'):
|
|
return os.path.join(self.get('cwd', ''), self.get('wallet_path'))
|
|
|
|
if use_gui_last_wallet:
|
|
path = self.get('gui_last_wallet')
|
|
if path and os.path.exists(path):
|
|
return path
|
|
|
|
# default path
|
|
util.assert_datadir_available(self.path)
|
|
dirpath = os.path.join(self.path, "wallets")
|
|
make_dir(dirpath, allow_symlink=False)
|
|
|
|
new_path = os.path.join(self.path, "wallets", "default_wallet")
|
|
|
|
# default path in pre 1.9 versions
|
|
old_path = os.path.join(self.path, "electrum.dat")
|
|
if os.path.exists(old_path) and not os.path.exists(new_path):
|
|
os.rename(old_path, new_path)
|
|
|
|
return new_path
|
|
|
|
def remove_from_recently_open(self, filename):
|
|
recent = self.get('recently_open', [])
|
|
if filename in recent:
|
|
recent.remove(filename)
|
|
self.set_key('recently_open', recent)
|
|
|
|
def set_session_timeout(self, seconds):
|
|
self.logger.info(f"session timeout -> {seconds} seconds")
|
|
self.set_key('session_timeout', seconds)
|
|
|
|
def get_session_timeout(self):
|
|
return self.get('session_timeout', 300)
|
|
|
|
def save_last_wallet(self, wallet):
|
|
if self.get('wallet_path') is None:
|
|
path = wallet.storage.path
|
|
self.set_key('gui_last_wallet', path)
|
|
|
|
def impose_hard_limits_on_fee(func):
|
|
def get_fee_within_limits(self, *args, **kwargs):
|
|
fee = func(self, *args, **kwargs)
|
|
if fee is None:
|
|
return fee
|
|
fee = min(FEERATE_MAX_DYNAMIC, fee)
|
|
fee = max(FEERATE_DEFAULT_RELAY, fee)
|
|
return fee
|
|
return get_fee_within_limits
|
|
|
|
def eta_to_fee(self, slider_pos) -> Optional[int]:
|
|
"""Returns fee in sat/kbyte."""
|
|
slider_pos = max(slider_pos, 0)
|
|
slider_pos = min(slider_pos, len(FEE_ETA_TARGETS))
|
|
if slider_pos < len(FEE_ETA_TARGETS):
|
|
num_blocks = FEE_ETA_TARGETS[int(slider_pos)]
|
|
fee = self.eta_target_to_fee(num_blocks)
|
|
else:
|
|
fee = self.eta_target_to_fee(1)
|
|
return fee
|
|
|
|
@impose_hard_limits_on_fee
|
|
def eta_target_to_fee(self, num_blocks: int) -> Optional[int]:
|
|
"""Returns fee in sat/kbyte."""
|
|
if num_blocks == 1:
|
|
fee = self.fee_estimates.get(2)
|
|
if fee is not None:
|
|
fee += fee / 2
|
|
fee = int(fee)
|
|
else:
|
|
fee = self.fee_estimates.get(num_blocks)
|
|
if fee is not None:
|
|
fee = int(fee)
|
|
return fee
|
|
|
|
def fee_to_depth(self, target_fee: Real) -> Optional[int]:
|
|
"""For a given sat/vbyte fee, returns an estimate of how deep
|
|
it would be in the current mempool in vbytes.
|
|
Pessimistic == overestimates the depth.
|
|
"""
|
|
if self.mempool_fees is None:
|
|
return None
|
|
depth = 0
|
|
for fee, s in self.mempool_fees:
|
|
depth += s
|
|
if fee <= target_fee:
|
|
break
|
|
return depth
|
|
|
|
def depth_to_fee(self, slider_pos) -> Optional[int]:
|
|
"""Returns fee in sat/kbyte."""
|
|
target = self.depth_target(slider_pos)
|
|
return self.depth_target_to_fee(target)
|
|
|
|
@impose_hard_limits_on_fee
|
|
def depth_target_to_fee(self, target: int) -> Optional[int]:
|
|
"""Returns fee in sat/kbyte.
|
|
target: desired mempool depth in vbytes
|
|
"""
|
|
if self.mempool_fees is None:
|
|
return None
|
|
depth = 0
|
|
for fee, s in self.mempool_fees:
|
|
depth += s
|
|
if depth > target:
|
|
break
|
|
else:
|
|
return 0
|
|
# add one sat/byte as currently that is
|
|
# the max precision of the histogram
|
|
# (well, in case of ElectrumX at least. not for electrs)
|
|
fee += 1
|
|
# convert to sat/kbyte
|
|
return int(fee * 1000)
|
|
|
|
def depth_target(self, slider_pos: int) -> int:
|
|
"""Returns mempool depth target in bytes for a fee slider position."""
|
|
slider_pos = max(slider_pos, 0)
|
|
slider_pos = min(slider_pos, len(FEE_DEPTH_TARGETS)-1)
|
|
return FEE_DEPTH_TARGETS[slider_pos]
|
|
|
|
def eta_target(self, slider_pos: int) -> int:
|
|
"""Returns 'num blocks' ETA target for a fee slider position."""
|
|
if slider_pos == len(FEE_ETA_TARGETS):
|
|
return 1
|
|
return FEE_ETA_TARGETS[slider_pos]
|
|
|
|
def fee_to_eta(self, fee_per_kb: Optional[int]) -> int:
|
|
"""Returns 'num blocks' ETA estimate for given fee rate,
|
|
or -1 for low fee.
|
|
"""
|
|
import operator
|
|
lst = list(self.fee_estimates.items())
|
|
next_block_fee = self.eta_target_to_fee(1)
|
|
if next_block_fee is not None:
|
|
lst += [(1, next_block_fee)]
|
|
if not lst or fee_per_kb is None:
|
|
return -1
|
|
dist = map(lambda x: (x[0], abs(x[1] - fee_per_kb)), lst)
|
|
min_target, min_value = min(dist, key=operator.itemgetter(1))
|
|
if fee_per_kb < self.fee_estimates.get(FEE_ETA_TARGETS[0])/2:
|
|
min_target = -1
|
|
return min_target
|
|
|
|
def depth_tooltip(self, depth: Optional[int]) -> str:
|
|
"""Returns text tooltip for given mempool depth (in vbytes)."""
|
|
if depth is None:
|
|
return "unknown from tip"
|
|
return "%.1f MB from tip" % (depth/1_000_000)
|
|
|
|
def eta_tooltip(self, x):
|
|
if x < 0:
|
|
return _('Low fee')
|
|
elif x == 1:
|
|
return _('In the next block')
|
|
else:
|
|
return _('Within {} blocks').format(x)
|
|
|
|
def get_fee_target(self):
|
|
dyn = self.is_dynfee()
|
|
mempool = self.use_mempool_fees()
|
|
pos = self.get_depth_level() if mempool else self.get_fee_level()
|
|
fee_rate = self.fee_per_kb()
|
|
target, tooltip = self.get_fee_text(pos, dyn, mempool, fee_rate)
|
|
return target, tooltip, dyn
|
|
|
|
def get_fee_status(self):
|
|
target, tooltip, dyn = self.get_fee_target()
|
|
return tooltip + ' [%s]'%target if dyn else target + ' [Static]'
|
|
|
|
def get_fee_text(
|
|
self,
|
|
slider_pos: int,
|
|
dyn: bool,
|
|
mempool: bool,
|
|
fee_per_kb: Optional[int],
|
|
):
|
|
"""Returns (text, tooltip) where
|
|
text is what we target: static fee / num blocks to confirm in / mempool depth
|
|
tooltip is the corresponding estimate (e.g. num blocks for a static fee)
|
|
|
|
fee_rate is in sat/kbyte
|
|
"""
|
|
if fee_per_kb is None:
|
|
rate_str = 'unknown'
|
|
fee_per_byte = None
|
|
else:
|
|
fee_per_byte = fee_per_kb/1000
|
|
rate_str = format_fee_satoshis(fee_per_byte) + ' sat/byte'
|
|
|
|
if dyn:
|
|
if mempool:
|
|
depth = self.depth_target(slider_pos)
|
|
text = self.depth_tooltip(depth)
|
|
else:
|
|
eta = self.eta_target(slider_pos)
|
|
text = self.eta_tooltip(eta)
|
|
tooltip = rate_str
|
|
else: # using static fees
|
|
assert fee_per_kb is not None
|
|
assert fee_per_byte is not None
|
|
text = rate_str
|
|
if mempool and self.has_fee_mempool():
|
|
depth = self.fee_to_depth(fee_per_byte)
|
|
tooltip = self.depth_tooltip(depth)
|
|
elif not mempool and self.has_fee_etas():
|
|
eta = self.fee_to_eta(fee_per_kb)
|
|
tooltip = self.eta_tooltip(eta)
|
|
else:
|
|
tooltip = ''
|
|
return text, tooltip
|
|
|
|
def get_depth_level(self):
|
|
maxp = len(FEE_DEPTH_TARGETS) - 1
|
|
return min(maxp, self.get('depth_level', 2))
|
|
|
|
def get_fee_level(self):
|
|
maxp = len(FEE_ETA_TARGETS) # not (-1) to have "next block"
|
|
return min(maxp, self.get('fee_level', 2))
|
|
|
|
def get_fee_slider(self, dyn, mempool) -> Tuple[int, int, Optional[int]]:
|
|
if dyn:
|
|
if mempool:
|
|
pos = self.get_depth_level()
|
|
maxp = len(FEE_DEPTH_TARGETS) - 1
|
|
fee_rate = self.depth_to_fee(pos)
|
|
else:
|
|
pos = self.get_fee_level()
|
|
maxp = len(FEE_ETA_TARGETS) # not (-1) to have "next block"
|
|
fee_rate = self.eta_to_fee(pos)
|
|
else:
|
|
fee_rate = self.fee_per_kb(dyn=False)
|
|
pos = self.static_fee_index(fee_rate)
|
|
maxp = len(FEERATE_STATIC_VALUES) - 1
|
|
return maxp, pos, fee_rate
|
|
|
|
def static_fee(self, i):
|
|
return FEERATE_STATIC_VALUES[i]
|
|
|
|
def static_fee_index(self, fee_per_kb: Optional[int]) -> int:
|
|
if fee_per_kb is None:
|
|
raise TypeError('static fee cannot be None')
|
|
dist = list(map(lambda x: abs(x - fee_per_kb), FEERATE_STATIC_VALUES))
|
|
return min(range(len(dist)), key=dist.__getitem__)
|
|
|
|
def has_fee_etas(self):
|
|
return len(self.fee_estimates) == 4
|
|
|
|
def has_fee_mempool(self) -> bool:
|
|
return self.mempool_fees is not None
|
|
|
|
def has_dynamic_fees_ready(self):
|
|
if self.use_mempool_fees():
|
|
return self.has_fee_mempool()
|
|
else:
|
|
return self.has_fee_etas()
|
|
|
|
def is_dynfee(self):
|
|
return bool(self.get('dynamic_fees', True))
|
|
|
|
def use_mempool_fees(self):
|
|
return bool(self.get('mempool_fees', False))
|
|
|
|
def _feerate_from_fractional_slider_position(self, fee_level: float, dyn: bool,
|
|
mempool: bool) -> Union[int, None]:
|
|
fee_level = max(fee_level, 0)
|
|
fee_level = min(fee_level, 1)
|
|
if dyn:
|
|
max_pos = (len(FEE_DEPTH_TARGETS) - 1) if mempool else len(FEE_ETA_TARGETS)
|
|
slider_pos = round(fee_level * max_pos)
|
|
fee_rate = self.depth_to_fee(slider_pos) if mempool else self.eta_to_fee(slider_pos)
|
|
else:
|
|
max_pos = len(FEERATE_STATIC_VALUES) - 1
|
|
slider_pos = round(fee_level * max_pos)
|
|
fee_rate = FEERATE_STATIC_VALUES[slider_pos]
|
|
return fee_rate
|
|
|
|
def fee_per_kb(self, dyn: bool=None, mempool: bool=None, fee_level: float=None) -> Optional[int]:
|
|
"""Returns sat/kvB fee to pay for a txn.
|
|
Note: might return None.
|
|
|
|
fee_level: float between 0.0 and 1.0, representing fee slider position
|
|
"""
|
|
if constants.net is constants.BitcoinRegtest:
|
|
return FEERATE_REGTEST_HARDCODED
|
|
if dyn is None:
|
|
dyn = self.is_dynfee()
|
|
if mempool is None:
|
|
mempool = self.use_mempool_fees()
|
|
if fee_level is not None:
|
|
return self._feerate_from_fractional_slider_position(fee_level, dyn, mempool)
|
|
# there is no fee_level specified; will use config.
|
|
# note: 'depth_level' and 'fee_level' in config are integer slider positions,
|
|
# unlike fee_level here, which (when given) is a float in [0.0, 1.0]
|
|
if dyn:
|
|
if mempool:
|
|
fee_rate = self.depth_to_fee(self.get_depth_level())
|
|
else:
|
|
fee_rate = self.eta_to_fee(self.get_fee_level())
|
|
else:
|
|
fee_rate = self.get('fee_per_kb', FEERATE_FALLBACK_STATIC_FEE)
|
|
if fee_rate is not None:
|
|
fee_rate = int(fee_rate)
|
|
return fee_rate
|
|
|
|
def fee_per_byte(self):
|
|
"""Returns sat/vB fee to pay for a txn.
|
|
Note: might return None.
|
|
"""
|
|
fee_per_kb = self.fee_per_kb()
|
|
return fee_per_kb / 1000 if fee_per_kb is not None else None
|
|
|
|
def estimate_fee(self, size: Union[int, float, Decimal], *,
|
|
allow_fallback_to_static_rates: bool = False) -> int:
|
|
fee_per_kb = self.fee_per_kb()
|
|
if fee_per_kb is None:
|
|
if allow_fallback_to_static_rates:
|
|
fee_per_kb = FEERATE_FALLBACK_STATIC_FEE
|
|
else:
|
|
raise NoDynamicFeeEstimates()
|
|
return self.estimate_fee_for_feerate(fee_per_kb, size)
|
|
|
|
@classmethod
|
|
def estimate_fee_for_feerate(cls, fee_per_kb: Union[int, float, Decimal],
|
|
size: Union[int, float, Decimal]) -> int:
|
|
size = Decimal(size)
|
|
fee_per_kb = Decimal(fee_per_kb)
|
|
fee_per_byte = fee_per_kb / 1000
|
|
# to be consistent with what is displayed in the GUI,
|
|
# the calculation needs to use the same precision:
|
|
fee_per_byte = quantize_feerate(fee_per_byte)
|
|
return round(fee_per_byte * size)
|
|
|
|
def update_fee_estimates(self, nblock_target: int, fee_per_kb: int):
|
|
assert isinstance(nblock_target, int), f"expected int, got {nblock_target!r}"
|
|
assert isinstance(fee_per_kb, int), f"expected int, got {fee_per_kb!r}"
|
|
self.fee_estimates[nblock_target] = fee_per_kb
|
|
|
|
def is_fee_estimates_update_required(self):
|
|
"""Checks time since last requested and updated fee estimates.
|
|
Returns True if an update should be requested.
|
|
"""
|
|
now = time.time()
|
|
return now - self.last_time_fee_estimates_requested > 60
|
|
|
|
def requested_fee_estimates(self):
|
|
self.last_time_fee_estimates_requested = time.time()
|
|
|
|
def get_video_device(self):
|
|
device = self.get("video_device", "default")
|
|
if device == 'default':
|
|
device = ''
|
|
return device
|
|
|
|
def get_ssl_context(self):
|
|
ssl_keyfile = self.get('ssl_keyfile')
|
|
ssl_certfile = self.get('ssl_certfile')
|
|
if ssl_keyfile and ssl_certfile:
|
|
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
ssl_context.load_cert_chain(ssl_certfile, ssl_keyfile)
|
|
return ssl_context
|
|
|
|
def get_ssl_domain(self):
|
|
from .paymentrequest import check_ssl_config
|
|
if self.get('ssl_keyfile') and self.get('ssl_certfile'):
|
|
SSL_identity = check_ssl_config(self)
|
|
else:
|
|
SSL_identity = None
|
|
return SSL_identity
|
|
|
|
def get_netaddress(self, key: str) -> Optional[NetAddress]:
|
|
text = self.get(key)
|
|
if text:
|
|
try:
|
|
return NetAddress.from_string(text)
|
|
except:
|
|
pass
|
|
|
|
def format_amount(self, x, is_diff=False, whitespaces=False):
|
|
return format_satoshis(
|
|
x,
|
|
num_zeros=self.num_zeros,
|
|
decimal_point=self.decimal_point,
|
|
is_diff=is_diff,
|
|
whitespaces=whitespaces,
|
|
precision=self.amt_precision_post_satoshi,
|
|
add_thousands_sep=self.amt_add_thousands_sep,
|
|
)
|
|
|
|
def format_amount_and_units(self, amount):
|
|
return self.format_amount(amount) + ' '+ self.get_base_unit()
|
|
|
|
def format_fee_rate(self, fee_rate):
|
|
return format_fee_satoshis(fee_rate/1000, num_zeros=self.num_zeros) + ' sat/byte'
|
|
|
|
def get_base_unit(self):
|
|
return decimal_point_to_base_unit_name(self.decimal_point)
|
|
|
|
def set_base_unit(self, unit):
|
|
assert unit in base_units.keys()
|
|
self.decimal_point = base_unit_name_to_decimal_point(unit)
|
|
self.set_key('decimal_point', self.decimal_point, True)
|
|
|
|
def get_decimal_point(self):
|
|
return self.decimal_point
|
|
|
|
|
|
def read_user_config(path):
|
|
"""Parse and store the user config settings in electrum.conf into user_config[]."""
|
|
if not path:
|
|
return {}
|
|
config_path = os.path.join(path, "config")
|
|
if not os.path.exists(config_path):
|
|
return {}
|
|
try:
|
|
with open(config_path, "r", encoding='utf-8') as f:
|
|
data = f.read()
|
|
result = json.loads(data)
|
|
except:
|
|
_logger.warning(f"Cannot read config file. {config_path}")
|
|
return {}
|
|
if not type(result) is dict:
|
|
return {}
|
|
return result
|
|
|