@ -23,6 +23,7 @@
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from datetime import datetime
import time
import random
import queue
@ -157,9 +158,8 @@ class NodeInfo(Base):
addresses = NodeInfo . parse_addresses_field ( payload [ ' addresses ' ] )
alias = payload [ ' alias ' ] . rstrip ( b ' \x00 ' ) . hex ( )
timestamp = int . from_bytes ( payload [ ' timestamp ' ] , " big " )
now = int ( time . time ( ) )
return NodeInfo ( node_id = node_id , features = features , timestamp = timestamp , alias = alias ) , [
Address ( host = host , port = port , node_id = node_id , last_connected_date = now ) for host , port in addresses ]
Address ( host = host , port = port , node_id = node_id , last_connected_date = None ) for host , port in addresses ]
@staticmethod
def parse_addresses_field ( addresses_field ) :
@ -206,8 +206,7 @@ class Address(Base):
node_id = Column ( String ( 66 ) , ForeignKey ( ' node_info.node_id ' ) , primary_key = True )
host = Column ( String ( 256 ) , primary_key = True )
port = Column ( Integer , primary_key = True )
last_connected_date = Column ( Integer ( ) , nullable = False )
last_connected_date = Column ( Integer ( ) , nullable = True )
@ -273,11 +272,8 @@ class ChannelDB(SqlDB):
@sql
def get_recent_peers ( self ) :
return [ LNPeerAddr ( x . host , x . port , bytes . fromhex ( x . node_id ) ) for x in self . DBSession \
. query ( Address ) \
. select_from ( NodeInfo ) \
. order_by ( Address . last_connected_date . desc ( ) ) \
. limit ( self . NUM_MAX_RECENT_PEERS ) ]
r = self . DBSession . query ( Address ) . filter ( Address . last_connected_date . isnot ( None ) ) . order_by ( Address . last_connected_date . desc ( ) ) . limit ( self . NUM_MAX_RECENT_PEERS ) . all ( )
return [ LNPeerAddr ( x . host , x . port , bytes . fromhex ( x . node_id ) ) for x in r ]
@sql
def get_channel_info ( self , channel_id : bytes ) :
@ -298,15 +294,6 @@ class ChannelDB(SqlDB):
chan_ids_from_policy = set ( x [ 0 ] for x in self . DBSession . query ( Policy . short_channel_id ) . filter ( expr ) . all ( ) )
if chan_ids_from_policy :
return chan_ids_from_policy
# fetch channels for node_ids missing in node_info. that will also give us node_announcement
expr = not_ ( ChannelInfo . node1_id . in_ ( self . DBSession . query ( NodeInfo . node_id ) ) )
chan_ids_from_id1 = set ( x [ 0 ] for x in self . DBSession . query ( ChannelInfo . short_channel_id ) . filter ( expr ) . all ( ) )
if chan_ids_from_id1 :
return chan_ids_from_id1
expr = not_ ( ChannelInfo . node2_id . in_ ( self . DBSession . query ( NodeInfo . node_id ) ) )
chan_ids_from_id2 = set ( x [ 0 ] for x in self . DBSession . query ( ChannelInfo . short_channel_id ) . filter ( expr ) . all ( ) )
if chan_ids_from_id2 :
return chan_ids_from_id2
return set ( )
@sql
@ -318,7 +305,7 @@ class ChannelDB(SqlDB):
self . DBSession . commit ( )
@sql
# @profiler
@profiler
def on_channel_announcement ( self , msg_payloads , trusted = False ) :
if type ( msg_payloads ) is dict :
msg_payloads = [ msg_payloads ]
@ -342,10 +329,16 @@ class ChannelDB(SqlDB):
for channel_info in new_channels . values ( ) :
self . DBSession . add ( channel_info )
self . DBSession . commit ( )
self . print_error ( ' on_channel_announcement: %d / %d ' % ( len ( new_channels ) , len ( msg_payloads ) ) )
#self.print_error('on_channel_announcement: %d/%d'%(len(new_channels), len(msg_payloads)) )
self . _update_counts ( )
self . network . trigger_callback ( ' ln_status ' )
@sql
def get_last_timestamp ( self ) :
from sqlalchemy . sql import func
r = self . DBSession . query ( func . max ( Policy . timestamp ) . label ( ' max_timestamp ' ) ) . one ( )
return r . max_timestamp or 0
@sql
@profiler
def on_channel_update ( self , msg_payloads , trusted = False ) :
@ -368,7 +361,8 @@ class ChannelDB(SqlDB):
if not trusted and not verify_sig_for_channel_update ( msg_payload , bytes . fromhex ( node_id ) ) :
continue
short_channel_id = channel_info . short_channel_id
new_policy = Policy . from_msg ( msg_payload , node_id , channel_info . short_channel_id )
new_policy = Policy . from_msg ( msg_payload , node_id , short_channel_id )
#self.print_error('on_channel_update', datetime.fromtimestamp(new_policy.timestamp).ctime())
old_policy = self . DBSession . query ( Policy ) . filter_by ( short_channel_id = short_channel_id , start_node = node_id ) . one_or_none ( )
if old_policy :
if old_policy . timestamp > = new_policy . timestamp :
@ -378,6 +372,7 @@ class ChannelDB(SqlDB):
if p and p . timestamp > = new_policy . timestamp :
continue
new_policies [ ( short_channel_id , node_id ) ] = new_policy
self . print_error ( ' on_channel_update: %d / %d ' % ( len ( new_policies ) , len ( msg_payloads ) ) )
# commit pending removals
self . DBSession . commit ( )
# add and commit new policies
@ -386,7 +381,7 @@ class ChannelDB(SqlDB):
self . DBSession . commit ( )
@sql
# @profiler
@profiler
def on_node_announcement ( self , msg_payloads ) :
if type ( msg_payloads ) is dict :
msg_payloads = [ msg_payloads ]
@ -403,7 +398,13 @@ class ChannelDB(SqlDB):
node_info , node_addresses = NodeInfo . from_msg ( msg_payload )
except UnknownEvenFeatureBits :
continue
#self.print_error('received node announcement from', datetime.fromtimestamp(node_info.timestamp).ctime())
node_id = node_info . node_id
# Ignore node if it has no associated channel (DoS protection)
expr = or_ ( ChannelInfo . node1_id == node_id , ChannelInfo . node2_id == node_id )
if self . DBSession . query ( ChannelInfo . short_channel_id ) . filter ( expr ) . count ( ) == 0 :
#self.print_error('ignoring orphan node_announcement')
continue
node = self . DBSession . query ( NodeInfo ) . filter_by ( node_id = node_id ) . one_or_none ( )
if node and node . timestamp > = node_info . timestamp :
continue
@ -413,20 +414,13 @@ class ChannelDB(SqlDB):
new_nodes [ node_id ] = node_info
for addr in node_addresses :
new_addresses [ ( addr . node_id , addr . host , addr . port ) ] = addr
self . print_error ( " on_node_announcements: %d / %d " % ( len ( new_nodes ) , len ( msg_payloads ) ) )
self . print_error ( " on_node_announcement: %d / %d " % ( len ( new_nodes ) , len ( msg_payloads ) ) )
for node_info in new_nodes . values ( ) :
self . DBSession . add ( node_info )
for new_addr in new_addresses . values ( ) :
old_addr = self . DBSession . query ( Address ) . filter_by ( node_id = new_addr . node_id , host = new_addr . host , port = new_addr . port ) . one_or_none ( )
if old_addr :
old_addr . last_connected_date = new_addr . last_connected_date
else :
if not old_addr :
self . DBSession . add ( new_addr )
# TODO if this message is for a new node, and if we have no associated
# channels for this node, we should ignore the message and return here,
# to mitigate DOS. but race condition: the channels we have for this
# node, might be under verification in self.ca_verifier, what then?
self . DBSession . commit ( )
self . _update_counts ( )
self . network . trigger_callback ( ' ln_status ' )