@ -37,7 +37,7 @@ from enum import IntEnum
from . sql_db import SqlDB , sql
from . sql_db import SqlDB , sql
from . import constants , util
from . import constants , util
from . util import bh2u , profiler , get_headers_dir , bfh , is_ip_address , list_enabled_bits
from . util import bh2u , profiler , get_headers_dir , is_ip_address , json_normalize
from . logging import Logger
from . logging import Logger
from . lnutil import ( LNPeerAddr , format_short_channel_id , ShortChannelID ,
from . lnutil import ( LNPeerAddr , format_short_channel_id , ShortChannelID ,
validate_features , IncompatibleOrInsaneFeatures )
validate_features , IncompatibleOrInsaneFeatures )
@ -52,6 +52,15 @@ if TYPE_CHECKING:
FLAG_DISABLE = 1 << 1
FLAG_DISABLE = 1 << 1
FLAG_DIRECTION = 1 << 0
FLAG_DIRECTION = 1 << 0
class NodeAddress ( NamedTuple ) :
""" Holds address information of Lightning nodes
and how up to date this info is . """
host : str
port : int
timestamp : int
class ChannelInfo ( NamedTuple ) :
class ChannelInfo ( NamedTuple ) :
short_channel_id : ShortChannelID
short_channel_id : ShortChannelID
node1_id : bytes
node1_id : bytes
@ -123,7 +132,6 @@ class Policy(NamedTuple):
return self . key [ 8 : ]
return self . key [ 8 : ]
class NodeInfo ( NamedTuple ) :
class NodeInfo ( NamedTuple ) :
node_id : bytes
node_id : bytes
features : int
features : int
@ -262,7 +270,7 @@ class ChannelDB(SqlDB):
self . _policies = { } # type: Dict[Tuple[bytes, ShortChannelID], Policy] # (node_id, scid) -> Policy
self . _policies = { } # type: Dict[Tuple[bytes, ShortChannelID], Policy] # (node_id, scid) -> Policy
self . _nodes = { } # type: Dict[bytes, NodeInfo] # node_id -> NodeInfo
self . _nodes = { } # type: Dict[bytes, NodeInfo] # node_id -> NodeInfo
# node_id -> (host, port, ts)
# node_id -> (host, port, ts)
self . _addresses = defaultdict ( set ) # type: Dict[bytes, Set[Tuple[str, int, int] ]]
self . _addresses = defaultdict ( set ) # type: Dict[bytes, Set[NodeAddress ]]
self . _channels_for_node = defaultdict ( set ) # type: Dict[bytes, Set[ShortChannelID]]
self . _channels_for_node = defaultdict ( set ) # type: Dict[bytes, Set[ShortChannelID]]
self . _recent_peers = [ ] # type: List[bytes] # list of node_ids
self . _recent_peers = [ ] # type: List[bytes] # list of node_ids
self . _chans_with_0_policies = set ( ) # type: Set[ShortChannelID]
self . _chans_with_0_policies = set ( ) # type: Set[ShortChannelID]
@ -287,7 +295,7 @@ class ChannelDB(SqlDB):
now = int ( time . time ( ) )
now = int ( time . time ( ) )
node_id = peer . pubkey
node_id = peer . pubkey
with self . lock :
with self . lock :
self . _addresses [ node_id ] . add ( ( peer . host , peer . port , now ) )
self . _addresses [ node_id ] . add ( NodeAddress ( peer . host , peer . port , now ) )
# list is ordered
# list is ordered
if node_id in self . _recent_peers :
if node_id in self . _recent_peers :
self . _recent_peers . remove ( node_id )
self . _recent_peers . remove ( node_id )
@ -304,10 +312,9 @@ class ChannelDB(SqlDB):
r = self . _addresses . get ( node_id )
r = self . _addresses . get ( node_id )
if not r :
if not r :
return None
return None
addr = sorted ( list ( r ) , key = lambda x : x [ 2 ] ) [ 0 ]
addr = sorted ( list ( r ) , key = lambda x : x . timestamp ) [ 0 ]
host , port , timestamp = addr
try :
try :
return LNPeerAddr ( host , port , node_id )
return LNPeerAddr ( addr . host , addr . port , node_id )
except ValueError :
except ValueError :
return None
return None
@ -549,7 +556,7 @@ class ChannelDB(SqlDB):
self . _db_save_node_info ( node_id , msg_payload [ ' raw ' ] )
self . _db_save_node_info ( node_id , msg_payload [ ' raw ' ] )
with self . lock :
with self . lock :
for addr in node_addresses :
for addr in node_addresses :
self . _addresses [ node_id ] . add ( ( addr . host , addr . port , 0 ) )
self . _addresses [ node_id ] . add ( NodeAddress ( addr . host , addr . port , 0 ) )
self . _db_save_node_addresses ( node_addresses )
self . _db_save_node_addresses ( node_addresses )
self . logger . debug ( " on_node_announcement: %d / %d " % ( len ( new_nodes ) , len ( msg_payloads ) ) )
self . logger . debug ( " on_node_announcement: %d / %d " % ( len ( new_nodes ) , len ( msg_payloads ) ) )
@ -613,11 +620,11 @@ class ChannelDB(SqlDB):
c . execute ( """ SELECT * FROM address """ )
c . execute ( """ SELECT * FROM address """ )
for x in c :
for x in c :
node_id , host , port , timestamp = x
node_id , host , port , timestamp = x
self . _addresses [ node_id ] . add ( ( str ( host ) , int ( port ) , int ( timestamp or 0 ) ) )
self . _addresses [ node_id ] . add ( NodeAddress ( str ( host ) , int ( port ) , int ( timestamp or 0 ) ) )
def newest_ts_for_node_id ( node_id ) :
def newest_ts_for_node_id ( node_id ) :
newest_ts = 0
newest_ts = 0
for host , port , ts in self . _addresses [ node_id ] :
for addr in self . _addresses [ node_id ] :
newest_ts = max ( newest_ts , ts )
newest_ts = max ( newest_ts , addr . time stamp )
return newest_ts
return newest_ts
sorted_node_ids = sorted ( self . _addresses . keys ( ) , key = newest_ts_for_node_id , reverse = True )
sorted_node_ids = sorted ( self . _addresses . keys ( ) , key = newest_ts_for_node_id , reverse = True )
self . _recent_peers = sorted_node_ids [ : self . NUM_MAX_RECENT_PEERS ]
self . _recent_peers = sorted_node_ids [ : self . NUM_MAX_RECENT_PEERS ]
@ -750,3 +757,36 @@ class ChannelDB(SqlDB):
def get_node_info_for_node_id ( self , node_id : bytes ) - > Optional [ ' NodeInfo ' ] :
def get_node_info_for_node_id ( self , node_id : bytes ) - > Optional [ ' NodeInfo ' ] :
return self . _nodes . get ( node_id )
return self . _nodes . get ( node_id )
def to_dict ( self ) - > dict :
""" Generates a graph representation in terms of a dictionary.
The dictionary contains only native python types and can be encoded
to json .
"""
with self . lock :
graph = { ' nodes ' : [ ] , ' channels ' : [ ] }
# gather nodes
for pk , nodeinfo in self . _nodes . items ( ) :
# use _asdict() to convert NamedTuples to json encodable dicts
graph [ ' nodes ' ] . append (
nodeinfo . _asdict ( ) ,
)
graph [ ' nodes ' ] [ - 1 ] [ ' addresses ' ] = [ addr . _asdict ( ) for addr in self . _addresses [ pk ] ]
# gather channels
for cid , channelinfo in self . _channels . items ( ) :
graph [ ' channels ' ] . append (
channelinfo . _asdict ( ) ,
)
policy1 = self . _policies . get (
( channelinfo . node1_id , channelinfo . short_channel_id ) )
policy2 = self . _policies . get (
( channelinfo . node2_id , channelinfo . short_channel_id ) )
graph [ ' channels ' ] [ - 1 ] [ ' policy1 ' ] = policy1 . _asdict ( ) if policy1 else None
graph [ ' channels ' ] [ - 1 ] [ ' policy2 ' ] = policy2 . _asdict ( ) if policy2 else None
# need to use json_normalize otherwise json encoding in rpc server fails
graph = json_normalize ( graph )
return graph