|
|
@ -37,7 +37,7 @@ from enum import IntEnum |
|
|
|
|
|
|
|
from .sql_db import SqlDB, sql |
|
|
|
from . import constants, util |
|
|
|
from .util import bh2u, profiler, get_headers_dir, bfh, is_ip_address, list_enabled_bits |
|
|
|
from .util import bh2u, profiler, get_headers_dir, is_ip_address, json_normalize |
|
|
|
from .logging import Logger |
|
|
|
from .lnutil import (LNPeerAddr, format_short_channel_id, ShortChannelID, |
|
|
|
validate_features, IncompatibleOrInsaneFeatures) |
|
|
@ -52,6 +52,15 @@ if TYPE_CHECKING: |
|
|
|
FLAG_DISABLE = 1 << 1 |
|
|
|
FLAG_DIRECTION = 1 << 0 |
|
|
|
|
|
|
|
|
|
|
|
class NodeAddress(NamedTuple): |
|
|
|
"""Holds address information of Lightning nodes |
|
|
|
and how up to date this info is.""" |
|
|
|
host: str |
|
|
|
port: int |
|
|
|
timestamp: int |
|
|
|
|
|
|
|
|
|
|
|
class ChannelInfo(NamedTuple): |
|
|
|
short_channel_id: ShortChannelID |
|
|
|
node1_id: bytes |
|
|
@ -123,7 +132,6 @@ class Policy(NamedTuple): |
|
|
|
return self.key[8:] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NodeInfo(NamedTuple): |
|
|
|
node_id: bytes |
|
|
|
features: int |
|
|
@ -262,7 +270,7 @@ class ChannelDB(SqlDB): |
|
|
|
self._policies = {} # type: Dict[Tuple[bytes, ShortChannelID], Policy] # (node_id, scid) -> Policy |
|
|
|
self._nodes = {} # type: Dict[bytes, NodeInfo] # node_id -> NodeInfo |
|
|
|
# node_id -> (host, port, ts) |
|
|
|
self._addresses = defaultdict(set) # type: Dict[bytes, Set[Tuple[str, int, int]]] |
|
|
|
self._addresses = defaultdict(set) # type: Dict[bytes, Set[NodeAddress]] |
|
|
|
self._channels_for_node = defaultdict(set) # type: Dict[bytes, Set[ShortChannelID]] |
|
|
|
self._recent_peers = [] # type: List[bytes] # list of node_ids |
|
|
|
self._chans_with_0_policies = set() # type: Set[ShortChannelID] |
|
|
@ -287,7 +295,7 @@ class ChannelDB(SqlDB): |
|
|
|
now = int(time.time()) |
|
|
|
node_id = peer.pubkey |
|
|
|
with self.lock: |
|
|
|
self._addresses[node_id].add((peer.host, peer.port, now)) |
|
|
|
self._addresses[node_id].add(NodeAddress(peer.host, peer.port, now)) |
|
|
|
# list is ordered |
|
|
|
if node_id in self._recent_peers: |
|
|
|
self._recent_peers.remove(node_id) |
|
|
@ -304,10 +312,9 @@ class ChannelDB(SqlDB): |
|
|
|
r = self._addresses.get(node_id) |
|
|
|
if not r: |
|
|
|
return None |
|
|
|
addr = sorted(list(r), key=lambda x: x[2])[0] |
|
|
|
host, port, timestamp = addr |
|
|
|
addr = sorted(list(r), key=lambda x: x.timestamp)[0] |
|
|
|
try: |
|
|
|
return LNPeerAddr(host, port, node_id) |
|
|
|
return LNPeerAddr(addr.host, addr.port, node_id) |
|
|
|
except ValueError: |
|
|
|
return None |
|
|
|
|
|
|
@ -549,7 +556,7 @@ class ChannelDB(SqlDB): |
|
|
|
self._db_save_node_info(node_id, msg_payload['raw']) |
|
|
|
with self.lock: |
|
|
|
for addr in node_addresses: |
|
|
|
self._addresses[node_id].add((addr.host, addr.port, 0)) |
|
|
|
self._addresses[node_id].add(NodeAddress(addr.host, addr.port, 0)) |
|
|
|
self._db_save_node_addresses(node_addresses) |
|
|
|
|
|
|
|
self.logger.debug("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) |
|
|
@ -613,11 +620,11 @@ class ChannelDB(SqlDB): |
|
|
|
c.execute("""SELECT * FROM address""") |
|
|
|
for x in c: |
|
|
|
node_id, host, port, timestamp = x |
|
|
|
self._addresses[node_id].add((str(host), int(port), int(timestamp or 0))) |
|
|
|
self._addresses[node_id].add(NodeAddress(str(host), int(port), int(timestamp or 0))) |
|
|
|
def newest_ts_for_node_id(node_id): |
|
|
|
newest_ts = 0 |
|
|
|
for host, port, ts in self._addresses[node_id]: |
|
|
|
newest_ts = max(newest_ts, ts) |
|
|
|
for addr in self._addresses[node_id]: |
|
|
|
newest_ts = max(newest_ts, addr.timestamp) |
|
|
|
return newest_ts |
|
|
|
sorted_node_ids = sorted(self._addresses.keys(), key=newest_ts_for_node_id, reverse=True) |
|
|
|
self._recent_peers = sorted_node_ids[:self.NUM_MAX_RECENT_PEERS] |
|
|
@ -750,3 +757,36 @@ class ChannelDB(SqlDB): |
|
|
|
|
|
|
|
def get_node_info_for_node_id(self, node_id: bytes) -> Optional['NodeInfo']: |
|
|
|
return self._nodes.get(node_id) |
|
|
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
|
""" Generates a graph representation in terms of a dictionary. |
|
|
|
|
|
|
|
The dictionary contains only native python types and can be encoded |
|
|
|
to json. |
|
|
|
""" |
|
|
|
with self.lock: |
|
|
|
graph = {'nodes': [], 'channels': []} |
|
|
|
|
|
|
|
# gather nodes |
|
|
|
for pk, nodeinfo in self._nodes.items(): |
|
|
|
# use _asdict() to convert NamedTuples to json encodable dicts |
|
|
|
graph['nodes'].append( |
|
|
|
nodeinfo._asdict(), |
|
|
|
) |
|
|
|
graph['nodes'][-1]['addresses'] = [addr._asdict() for addr in self._addresses[pk]] |
|
|
|
|
|
|
|
# gather channels |
|
|
|
for cid, channelinfo in self._channels.items(): |
|
|
|
graph['channels'].append( |
|
|
|
channelinfo._asdict(), |
|
|
|
) |
|
|
|
policy1 = self._policies.get( |
|
|
|
(channelinfo.node1_id, channelinfo.short_channel_id)) |
|
|
|
policy2 = self._policies.get( |
|
|
|
(channelinfo.node2_id, channelinfo.short_channel_id)) |
|
|
|
graph['channels'][-1]['policy1'] = policy1._asdict() if policy1 else None |
|
|
|
graph['channels'][-1]['policy2'] = policy2._asdict() if policy2 else None |
|
|
|
|
|
|
|
# need to use json_normalize otherwise json encoding in rpc server fails |
|
|
|
graph = json_normalize(graph) |
|
|
|
return graph |
|
|
|