diff --git a/contrib/pyln-testing/pyln/testing/fixtures.py b/contrib/pyln-testing/pyln/testing/fixtures.py index ea6ccdc5f..ae5fcd51f 100644 --- a/contrib/pyln-testing/pyln/testing/fixtures.py +++ b/contrib/pyln-testing/pyln/testing/fixtures.py @@ -1,6 +1,6 @@ from concurrent import futures from pyln.testing.db import SqliteDbProvider, PostgresDbProvider -from pyln.testing.utils import NodeFactory, BitcoinD, ElementsD, env, DEVELOPER, LightningNode, TEST_DEBUG +from pyln.testing.utils import NodeFactory, BitcoinD, ElementsD, env, DEVELOPER, LightningNode, TEST_DEBUG, Throttler from typing import Dict import logging @@ -198,7 +198,12 @@ def teardown_checks(request): @pytest.fixture -def node_factory(request, directory, test_name, bitcoind, executor, db_provider, teardown_checks, node_cls): +def throttler(): + yield Throttler() + + +@pytest.fixture +def node_factory(request, directory, test_name, bitcoind, executor, db_provider, teardown_checks, node_cls, throttler): nf = NodeFactory( request, test_name, @@ -206,7 +211,8 @@ def node_factory(request, directory, test_name, bitcoind, executor, db_provider, executor, directory=directory, db_provider=db_provider, - node_cls=node_cls + node_cls=node_cls, + throttler=throttler, ) yield nf diff --git a/contrib/pyln-testing/pyln/testing/utils.py b/contrib/pyln-testing/pyln/testing/utils.py index d60c0eba1..66fe4c47d 100644 --- a/contrib/pyln-testing/pyln/testing/utils.py +++ b/contrib/pyln-testing/pyln/testing/utils.py @@ -6,6 +6,7 @@ from pyln.testing.btcproxy import BitcoinRpcProxy from collections import OrderedDict from decimal import Decimal from ephemeral_port_reserve import reserve # type: ignore +from filelock import FileLock from pyln.client import LightningRpc from pyln.client import Millisatoshi @@ -14,6 +15,7 @@ import logging import lzma import math import os +import psutil # type: ignore import random import re import shutil @@ -1038,10 +1040,59 @@ class LightningNode(object): return msgs +class Throttler(object): + """Throttles the creation of system-processes to avoid overload. + + There is no reason to overload the system with too many processes + being spawned or run at the same time. It causes timeouts by + aggressively preempting processes and swapping if the memory limit is + reached. In order to reduce this loss of performance we provide a + `wait()` method which will serialize the creation of processes, but + also delay if the system load is too high. + + Notice that technically we are throttling too late, i.e., we react + to an overload, but chances are pretty good that some other + already running process is about to terminate, and so the overload + is short-lived. We throttle when the process object is first + created, not when restarted, in order to avoid delaying running + tests, which could cause more timeouts. + + """ + def __init__(self, target: float = 75): + """If specified we try to stick to a load of target (in percent). + """ + self.target = target + self.lock = FileLock("/tmp/ltest.lock") + self.current_load = self.target # Start slow + psutil.cpu_percent() # Prime the internal load metric + + def wait(self): + start_time = time.time() + with self.lock.acquire(poll_intervall=0.250): + # We just got the lock, assume someone else just released it + self.current_load = 100 + while self.load() >= self.target: + time.sleep(1) + + delay = time.time() - start_time + with open("/tmp/ltest-throttler.csv", "a") as f: + f.write("{}, {}, {}, {}\n".format(time.time(), self.load(), self.target, delay)) + self.current_load = 100 # Back off slightly to avoid triggering right away + + def load(self): + """An exponential moving average of the load + """ + decay = 0.5 + load = psutil.cpu_percent() + self.current_load = decay * load + (1 - decay) * self.current_load + return self.current_load + + class NodeFactory(object): """A factory to setup and start `lightningd` daemons. """ - def __init__(self, request, testname, bitcoind, executor, directory, db_provider, node_cls): + def __init__(self, request, testname, bitcoind, executor, directory, + db_provider, node_cls, throttler): if request.node.get_closest_marker("slow_test") and SLOW_MACHINE: self.valgrind = False else: @@ -1055,6 +1106,7 @@ class NodeFactory(object): self.lock = threading.Lock() self.db_provider = db_provider self.node_cls = node_cls + self.throttler = throttler def split_options(self, opts): """Split node options from cli options @@ -1115,7 +1167,7 @@ class NodeFactory(object): feerates=(15000, 11000, 7500, 3750), start=True, wait_for_bitcoind_sync=True, may_fail=False, expect_fail=False, cleandir=True, **kwargs): - + self.throttler.wait() node_id = self.get_node_id() if not node_id else node_id port = self.get_next_port() diff --git a/contrib/pyln-testing/requirements.txt b/contrib/pyln-testing/requirements.txt index b28809f2d..a4c3c9324 100644 --- a/contrib/pyln-testing/requirements.txt +++ b/contrib/pyln-testing/requirements.txt @@ -5,3 +5,5 @@ cheroot==8.2.1 ephemeral-port-reserve==1.1.1 python-bitcoinlib==0.10.2 psycopg2-binary==2.8.4 +filelock==3.0.* +psutil==5.7.* diff --git a/tests/fixtures.py b/tests/fixtures.py index 5fb6ad39d..8a7e9c249 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,5 +1,5 @@ from utils import DEVELOPER, TEST_NETWORK # noqa: F401,F403 -from pyln.testing.fixtures import directory, test_base_dir, test_name, chainparams, node_factory, bitcoind, teardown_checks, db_provider, executor, setup_logging # noqa: F401,F403 +from pyln.testing.fixtures import directory, test_base_dir, test_name, chainparams, node_factory, bitcoind, teardown_checks, throttler, db_provider, executor, setup_logging # noqa: F401,F403 from pyln.testing import utils from utils import COMPAT