|
|
@ -111,6 +111,12 @@ class Policy(NamedTuple): |
|
|
|
timestamp = int.from_bytes(payload['timestamp'], "big") |
|
|
|
) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_raw_msg(key:bytes, raw: bytes) -> 'Policy': |
|
|
|
payload = decode_msg(raw)[1] |
|
|
|
payload['start_node'] = key[8:] |
|
|
|
return Policy.from_msg(payload) |
|
|
|
|
|
|
|
def is_disabled(self): |
|
|
|
return self.channel_flags & FLAG_DISABLE |
|
|
|
|
|
|
@ -141,6 +147,11 @@ class NodeInfo(NamedTuple): |
|
|
|
return NodeInfo(node_id=node_id, features=features, timestamp=timestamp, alias=alias), [ |
|
|
|
Address(host=host, port=port, node_id=node_id, last_connected_date=None) for host, port in addresses] |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def from_raw_msg(raw: bytes) -> 'NodeInfo': |
|
|
|
payload_dict = decode_msg(raw)[1] |
|
|
|
return NodeInfo.from_msg(payload_dict) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def parse_addresses_field(addresses_field): |
|
|
|
buf = addresses_field |
|
|
@ -197,35 +208,24 @@ class CategorizedChannelUpdates(NamedTuple): |
|
|
|
to_delete: List # database entries to delete |
|
|
|
|
|
|
|
|
|
|
|
# TODO It would make more sense to store the raw gossip messages in the db. |
|
|
|
# That is pretty much a pre-requisite of actively participating in gossip. |
|
|
|
|
|
|
|
create_channel_info = """ |
|
|
|
CREATE TABLE IF NOT EXISTS channel_info ( |
|
|
|
short_channel_id VARCHAR(64), |
|
|
|
node1_id VARCHAR(66), |
|
|
|
node2_id VARCHAR(66), |
|
|
|
capacity_sat INTEGER, |
|
|
|
short_channel_id BLOB(8), |
|
|
|
msg BLOB, |
|
|
|
PRIMARY KEY(short_channel_id) |
|
|
|
)""" |
|
|
|
|
|
|
|
create_policy = """ |
|
|
|
CREATE TABLE IF NOT EXISTS policy ( |
|
|
|
key VARCHAR(66), |
|
|
|
cltv_expiry_delta INTEGER NOT NULL, |
|
|
|
htlc_minimum_msat INTEGER NOT NULL, |
|
|
|
htlc_maximum_msat INTEGER, |
|
|
|
fee_base_msat INTEGER NOT NULL, |
|
|
|
fee_proportional_millionths INTEGER NOT NULL, |
|
|
|
channel_flags INTEGER NOT NULL, |
|
|
|
message_flags INTEGER NOT NULL, |
|
|
|
timestamp INTEGER NOT NULL, |
|
|
|
key BLOB(41), |
|
|
|
msg BLOB, |
|
|
|
PRIMARY KEY(key) |
|
|
|
)""" |
|
|
|
|
|
|
|
create_address = """ |
|
|
|
CREATE TABLE IF NOT EXISTS address ( |
|
|
|
node_id VARCHAR(66), |
|
|
|
node_id BLOB(33), |
|
|
|
host STRING(256), |
|
|
|
port INTEGER NOT NULL, |
|
|
|
timestamp INTEGER, |
|
|
@ -234,10 +234,8 @@ PRIMARY KEY(node_id, host, port) |
|
|
|
|
|
|
|
create_node_info = """ |
|
|
|
CREATE TABLE IF NOT EXISTS node_info ( |
|
|
|
node_id VARCHAR(66), |
|
|
|
features INTEGER NOT NULL, |
|
|
|
timestamp INTEGER NOT NULL, |
|
|
|
alias STRING(64), |
|
|
|
node_id BOB(33), |
|
|
|
msg BLOB, |
|
|
|
PRIMARY KEY(node_id) |
|
|
|
)""" |
|
|
|
|
|
|
@ -247,7 +245,7 @@ class ChannelDB(SqlDB): |
|
|
|
NUM_MAX_RECENT_PEERS = 20 |
|
|
|
|
|
|
|
def __init__(self, network: 'Network'): |
|
|
|
path = os.path.join(get_headers_dir(network.config), 'channel_db') |
|
|
|
path = os.path.join(get_headers_dir(network.config), 'gossip_db') |
|
|
|
super().__init__(network, path, commit_interval=100) |
|
|
|
self.num_nodes = 0 |
|
|
|
self.num_channels = 0 |
|
|
@ -341,7 +339,7 @@ class ChannelDB(SqlDB): |
|
|
|
self._channels[channel_info.short_channel_id] = channel_info |
|
|
|
self._channels_for_node[channel_info.node1_id].add(channel_info.short_channel_id) |
|
|
|
self._channels_for_node[channel_info.node2_id].add(channel_info.short_channel_id) |
|
|
|
self.save_channel(channel_info) |
|
|
|
self.save_channel(channel_info.short_channel_id, msg['raw']) |
|
|
|
|
|
|
|
def print_change(self, old_policy: Policy, new_policy: Policy): |
|
|
|
# print what changed between policies |
|
|
@ -399,7 +397,7 @@ class ChannelDB(SqlDB): |
|
|
|
self.verify_channel_update(payload) |
|
|
|
policy = Policy.from_msg(payload) |
|
|
|
self._policies[key] = policy |
|
|
|
self.save_policy(policy) |
|
|
|
self.save_policy(policy.key, payload['raw']) |
|
|
|
# |
|
|
|
self.update_counts() |
|
|
|
return CategorizedChannelUpdates( |
|
|
@ -423,9 +421,9 @@ class ChannelDB(SqlDB): |
|
|
|
self.conn.commit() |
|
|
|
|
|
|
|
@sql |
|
|
|
def save_policy(self, policy): |
|
|
|
def save_policy(self, key, msg): |
|
|
|
c = self.conn.cursor() |
|
|
|
c.execute("""REPLACE INTO policy (key, cltv_expiry_delta, htlc_minimum_msat, htlc_maximum_msat, fee_base_msat, fee_proportional_millionths, channel_flags, message_flags, timestamp) VALUES (?,?,?,?,?,?,?,?,?)""", list(policy)) |
|
|
|
c.execute("""REPLACE INTO policy (key, msg) VALUES (?,?)""", [key, msg]) |
|
|
|
|
|
|
|
@sql |
|
|
|
def delete_policy(self, node_id, short_channel_id): |
|
|
@ -434,9 +432,9 @@ class ChannelDB(SqlDB): |
|
|
|
c.execute("""DELETE FROM policy WHERE key=?""", (key,)) |
|
|
|
|
|
|
|
@sql |
|
|
|
def save_channel(self, channel_info): |
|
|
|
def save_channel(self, short_channel_id, msg): |
|
|
|
c = self.conn.cursor() |
|
|
|
c.execute("REPLACE INTO channel_info (short_channel_id, node1_id, node2_id, capacity_sat) VALUES (?,?,?,?)", list(channel_info)) |
|
|
|
c.execute("REPLACE INTO channel_info (short_channel_id, msg) VALUES (?,?)", [short_channel_id, msg]) |
|
|
|
|
|
|
|
@sql |
|
|
|
def delete_channel(self, short_channel_id): |
|
|
@ -444,9 +442,9 @@ class ChannelDB(SqlDB): |
|
|
|
c.execute("""DELETE FROM channel_info WHERE short_channel_id=?""", (short_channel_id,)) |
|
|
|
|
|
|
|
@sql |
|
|
|
def save_node(self, node_info): |
|
|
|
def save_node_info(self, node_id, msg): |
|
|
|
c = self.conn.cursor() |
|
|
|
c.execute("REPLACE INTO node_info (node_id, features, timestamp, alias) VALUES (?,?,?,?)", list(node_info)) |
|
|
|
c.execute("REPLACE INTO node_info (node_id, msg) VALUES (?,?)", [node_id, msg]) |
|
|
|
|
|
|
|
@sql |
|
|
|
def save_node_address(self, node_id, peer, now): |
|
|
@ -493,7 +491,7 @@ class ChannelDB(SqlDB): |
|
|
|
continue |
|
|
|
# save |
|
|
|
self._nodes[node_id] = node_info |
|
|
|
self.save_node(node_info) |
|
|
|
self.save_node_info(node_id, msg_payload['raw']) |
|
|
|
for addr in node_addresses: |
|
|
|
self._addresses[node_id].add((addr.host, addr.port, 0)) |
|
|
|
self.save_node_addresses(node_id, node_addresses) |
|
|
@ -553,17 +551,17 @@ class ChannelDB(SqlDB): |
|
|
|
node_id, host, port, timestamp = x |
|
|
|
self._addresses[node_id].add((str(host), int(port), int(timestamp or 0))) |
|
|
|
c.execute("""SELECT * FROM channel_info""") |
|
|
|
for x in c: |
|
|
|
x = (ShortChannelID.normalize(x[0]), *x[1:]) |
|
|
|
ci = ChannelInfo(*x) |
|
|
|
self._channels[ci.short_channel_id] = ci |
|
|
|
for short_channel_id, msg in c: |
|
|
|
ci = ChannelInfo.from_raw_msg(msg) |
|
|
|
self._channels[short_channel_id] = ci |
|
|
|
c.execute("""SELECT * FROM node_info""") |
|
|
|
for x in c: |
|
|
|
ni = NodeInfo(*x) |
|
|
|
self._nodes[ni.node_id] = ni |
|
|
|
for node_id, msg in c: |
|
|
|
node_info, node_addresses = NodeInfo.from_raw_msg(msg) |
|
|
|
# don't load node_addresses because they dont have timestamps |
|
|
|
self._nodes[node_id] = node_info |
|
|
|
c.execute("""SELECT * FROM policy""") |
|
|
|
for x in c: |
|
|
|
p = Policy(*x) |
|
|
|
for key, msg in c: |
|
|
|
p = Policy.from_raw_msg(key, msg) |
|
|
|
self._policies[(p.start_node, p.short_channel_id)] = p |
|
|
|
for channel_info in self._channels.values(): |
|
|
|
self._channels_for_node[channel_info.node1_id].add(channel_info.short_channel_id) |
|
|
|