|
|
@ -233,20 +233,6 @@ class ChannelDB(SqlDB): |
|
|
|
self.num_policies = self.DBSession.query(Policy).count() |
|
|
|
self.num_nodes = self.DBSession.query(NodeInfo).count() |
|
|
|
|
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def purge_unknown_channels(self, channel_ids): |
|
|
|
ids = [x.hex() for x in channel_ids] |
|
|
|
missing = self.DBSession \ |
|
|
|
.query(ChannelInfo) \ |
|
|
|
.filter(not_(ChannelInfo.short_channel_id.in_(ids))) \ |
|
|
|
.all() |
|
|
|
if missing: |
|
|
|
self.logger.info("deleting {} channels".format(len(missing))) |
|
|
|
delete_query = ChannelInfo.__table__.delete().where(not_(ChannelInfo.short_channel_id.in_(ids))) |
|
|
|
self.DBSession.execute(delete_query) |
|
|
|
self.DBSession.commit() |
|
|
|
|
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def compare_channels(self, channel_ids): |
|
|
@ -344,7 +330,7 @@ class ChannelDB(SqlDB): |
|
|
|
self.DBSession.add(channel_info) |
|
|
|
self.DBSession.commit() |
|
|
|
self._update_counts() |
|
|
|
self.logger.info('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads))) |
|
|
|
self.logger.debug('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads))) |
|
|
|
|
|
|
|
@sql |
|
|
|
def get_last_timestamp(self): |
|
|
@ -390,6 +376,7 @@ class ChannelDB(SqlDB): |
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def on_channel_update(self, msg_payloads): |
|
|
|
now = int(time.time()) |
|
|
|
if type(msg_payloads) is dict: |
|
|
|
msg_payloads = [msg_payloads] |
|
|
|
new_policies = {} |
|
|
@ -397,7 +384,9 @@ class ChannelDB(SqlDB): |
|
|
|
short_channel_id = msg_payload['short_channel_id'].hex() |
|
|
|
node_id = msg_payload['node_id'].hex() |
|
|
|
new_policy = Policy.from_msg(msg_payload, node_id, short_channel_id) |
|
|
|
#self.logger.info(f'on_channel_update {datetime.fromtimestamp(new_policy.timestamp).ctime()}') |
|
|
|
# must not be older than two weeks |
|
|
|
if new_policy.timestamp < now - 14*24*3600: |
|
|
|
continue |
|
|
|
old_policy = self.DBSession.query(Policy).filter_by(short_channel_id=short_channel_id, start_node=node_id).one_or_none() |
|
|
|
if old_policy: |
|
|
|
if old_policy.timestamp >= new_policy.timestamp: |
|
|
@ -414,7 +403,7 @@ class ChannelDB(SqlDB): |
|
|
|
self.DBSession.add(new_policy) |
|
|
|
self.DBSession.commit() |
|
|
|
if new_policies: |
|
|
|
self.logger.info(f'on_channel_update: {len(new_policies)}/{len(msg_payloads)}') |
|
|
|
self.logger.debug(f'on_channel_update: {len(new_policies)}/{len(msg_payloads)}') |
|
|
|
#self.logger.info(f'last timestamp: {datetime.fromtimestamp(self._get_last_timestamp()).ctime()}') |
|
|
|
self._update_counts() |
|
|
|
|
|
|
@ -446,7 +435,7 @@ class ChannelDB(SqlDB): |
|
|
|
new_nodes[node_id] = node_info |
|
|
|
for addr in node_addresses: |
|
|
|
new_addresses[(addr.node_id,addr.host,addr.port)] = addr |
|
|
|
self.logger.info("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) |
|
|
|
self.logger.debug("on_node_announcement: %d/%d"%(len(new_nodes), len(msg_payloads))) |
|
|
|
for node_info in new_nodes.values(): |
|
|
|
self.DBSession.add(node_info) |
|
|
|
for new_addr in new_addresses.values(): |
|
|
@ -467,6 +456,42 @@ class ChannelDB(SqlDB): |
|
|
|
return None |
|
|
|
return Policy.from_msg(msg, None, short_channel_id) # won't actually be written to DB |
|
|
|
|
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def get_old_policies(self, delta): |
|
|
|
timestamp = int(time.time()) - delta |
|
|
|
old_policies = self.DBSession.query(Policy.short_channel_id).filter(Policy.timestamp <= timestamp) |
|
|
|
return old_policies.distinct().count() |
|
|
|
|
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def prune_old_policies(self, delta): |
|
|
|
# note: delete queries are order sensitive |
|
|
|
timestamp = int(time.time()) - delta |
|
|
|
old_policies = self.DBSession.query(Policy.short_channel_id).filter(Policy.timestamp <= timestamp) |
|
|
|
delete_old_channels = ChannelInfo.__table__.delete().where(ChannelInfo.short_channel_id.in_(old_policies)) |
|
|
|
delete_old_policies = Policy.__table__.delete().where(Policy.timestamp <= timestamp) |
|
|
|
self.DBSession.execute(delete_old_channels) |
|
|
|
self.DBSession.execute(delete_old_policies) |
|
|
|
self.DBSession.commit() |
|
|
|
self._update_counts() |
|
|
|
|
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def get_orphaned_channels(self): |
|
|
|
subquery = self.DBSession.query(Policy.short_channel_id) |
|
|
|
orphaned = self.DBSession.query(ChannelInfo).filter(not_(ChannelInfo.short_channel_id.in_(subquery))) |
|
|
|
return orphaned.count() |
|
|
|
|
|
|
|
@sql |
|
|
|
@profiler |
|
|
|
def prune_orphaned_channels(self): |
|
|
|
subquery = self.DBSession.query(Policy.short_channel_id) |
|
|
|
delete_orphaned = ChannelInfo.__table__.delete().where(not_(ChannelInfo.short_channel_id.in_(subquery))) |
|
|
|
self.DBSession.execute(delete_orphaned) |
|
|
|
self.DBSession.commit() |
|
|
|
self._update_counts() |
|
|
|
|
|
|
|
def add_channel_update_for_private_channel(self, msg_payload: dict, start_node_id: bytes): |
|
|
|
if not verify_sig_for_channel_update(msg_payload, start_node_id): |
|
|
|
return # ignore |
|
|
|