@ -6,6 +6,7 @@ from pyln.testing.btcproxy import BitcoinRpcProxy
from collections import OrderedDict
from collections import OrderedDict
from decimal import Decimal
from decimal import Decimal
from ephemeral_port_reserve import reserve # type: ignore
from ephemeral_port_reserve import reserve # type: ignore
from filelock import FileLock
from pyln . client import LightningRpc
from pyln . client import LightningRpc
from pyln . client import Millisatoshi
from pyln . client import Millisatoshi
@ -14,6 +15,7 @@ import logging
import lzma
import lzma
import math
import math
import os
import os
import psutil # type: ignore
import random
import random
import re
import re
import shutil
import shutil
@ -1038,10 +1040,59 @@ class LightningNode(object):
return msgs
return msgs
class Throttler ( object ) :
""" Throttles the creation of system-processes to avoid overload.
There is no reason to overload the system with too many processes
being spawned or run at the same time . It causes timeouts by
aggressively preempting processes and swapping if the memory limit is
reached . In order to reduce this loss of performance we provide a
` wait ( ) ` method which will serialize the creation of processes , but
also delay if the system load is too high .
Notice that technically we are throttling too late , i . e . , we react
to an overload , but chances are pretty good that some other
already running process is about to terminate , and so the overload
is short - lived . We throttle when the process object is first
created , not when restarted , in order to avoid delaying running
tests , which could cause more timeouts .
"""
def __init__ ( self , target : float = 75 ) :
""" If specified we try to stick to a load of target (in percent).
"""
self . target = target
self . lock = FileLock ( " /tmp/ltest.lock " )
self . current_load = self . target # Start slow
psutil . cpu_percent ( ) # Prime the internal load metric
def wait ( self ) :
start_time = time . time ( )
with self . lock . acquire ( poll_intervall = 0.250 ) :
# We just got the lock, assume someone else just released it
self . current_load = 100
while self . load ( ) > = self . target :
time . sleep ( 1 )
delay = time . time ( ) - start_time
with open ( " /tmp/ltest-throttler.csv " , " a " ) as f :
f . write ( " {} , {} , {} , {} \n " . format ( time . time ( ) , self . load ( ) , self . target , delay ) )
self . current_load = 100 # Back off slightly to avoid triggering right away
def load ( self ) :
""" An exponential moving average of the load
"""
decay = 0.5
load = psutil . cpu_percent ( )
self . current_load = decay * load + ( 1 - decay ) * self . current_load
return self . current_load
class NodeFactory ( object ) :
class NodeFactory ( object ) :
""" A factory to setup and start `lightningd` daemons.
""" A factory to setup and start `lightningd` daemons.
"""
"""
def __init__ ( self , request , testname , bitcoind , executor , directory , db_provider , node_cls ) :
def __init__ ( self , request , testname , bitcoind , executor , directory ,
db_provider , node_cls , throttler ) :
if request . node . get_closest_marker ( " slow_test " ) and SLOW_MACHINE :
if request . node . get_closest_marker ( " slow_test " ) and SLOW_MACHINE :
self . valgrind = False
self . valgrind = False
else :
else :
@ -1055,6 +1106,7 @@ class NodeFactory(object):
self . lock = threading . Lock ( )
self . lock = threading . Lock ( )
self . db_provider = db_provider
self . db_provider = db_provider
self . node_cls = node_cls
self . node_cls = node_cls
self . throttler = throttler
def split_options ( self , opts ) :
def split_options ( self , opts ) :
""" Split node options from cli options
""" Split node options from cli options
@ -1115,7 +1167,7 @@ class NodeFactory(object):
feerates = ( 15000 , 11000 , 7500 , 3750 ) , start = True ,
feerates = ( 15000 , 11000 , 7500 , 3750 ) , start = True ,
wait_for_bitcoind_sync = True , may_fail = False ,
wait_for_bitcoind_sync = True , may_fail = False ,
expect_fail = False , cleandir = True , * * kwargs ) :
expect_fail = False , cleandir = True , * * kwargs ) :
self . throttler . wait ( )
node_id = self . get_node_id ( ) if not node_id else node_id
node_id = self . get_node_id ( ) if not node_id else node_id
port = self . get_next_port ( )
port = self . get_next_port ( )