@ -48,7 +48,7 @@ class SweepTx(Base):
__tablename__ = ' sweep_txs '
funding_outpoint = Column ( String ( 34 ) , primary_key = True )
index = Column ( Integer ( ) , primary_key = True )
prev_txid = Column ( String ( 32 ) )
prevout = Column ( String ( 34 ) )
tx = Column ( String ( ) )
class ChannelInfo ( Base ) :
@ -64,22 +64,22 @@ class SweepStore(SqlDB):
super ( ) . __init__ ( network , path , Base )
@sql
def get_sweep_tx ( self , funding_outpoint , prev_txid ) :
return [ Transaction ( bh2u ( r . tx ) ) for r in self . DBSession . query ( SweepTx ) . filter ( SweepTx . funding_outpoint == funding_outpoint , SweepTx . prev_txid == prev_txid ) . all ( ) ]
def get_sweep_tx ( self , funding_outpoint , prevout ) :
return [ Transaction ( bh2u ( r . tx ) ) for r in self . DBSession . query ( SweepTx ) . filter ( SweepTx . funding_outpoint == funding_outpoint , SweepTx . prevout == prevout ) . all ( ) ]
@sql
def get_tx_by_index ( self , funding_outpoint , index ) :
r = self . DBSession . query ( SweepTx ) . filter ( SweepTx . funding_outpoint == funding_outpoint , SweepTx . index == index ) . one_or_none ( )
return r . prev_txid , bh2u ( r . tx )
return r . prevout , bh2u ( r . tx )
@sql
def list_sweep_tx ( self ) :
return set ( r . funding_outpoint for r in self . DBSession . query ( SweepTx ) . all ( ) )
@sql
def add_sweep_tx ( self , funding_outpoint , prev_txid , tx ) :
def add_sweep_tx ( self , funding_outpoint , prevout , tx ) :
n = self . DBSession . query ( SweepTx ) . filter ( funding_outpoint == funding_outpoint ) . count ( )
self . DBSession . add ( SweepTx ( funding_outpoint = funding_outpoint , index = n , prev_txid = prev_txid , tx = bfh ( tx ) ) )
self . DBSession . add ( SweepTx ( funding_outpoint = funding_outpoint , index = n , prevout = prevout , tx = bfh ( tx ) ) )
self . DBSession . commit ( )
@sql
@ -172,8 +172,8 @@ class LNWatcher(AddressSynchronizer):
self . watchtower . add_channel ( outpoint , address )
self . logger . info ( " sending %d transactions to watchtower " % ( local_n - n ) )
for index in range ( n , local_n ) :
prev_txid , tx = self . sweepstore . get_tx_by_index ( outpoint , index )
self . watchtower . add_sweep_tx ( outpoint , prev_txid , tx )
prevout , tx = self . sweepstore . get_tx_by_index ( outpoint , index )
self . watchtower . add_sweep_tx ( outpoint , prevout , tx )
except ConnectionRefusedError :
self . logger . info ( ' could not reach watchtower, will retry in 5s ' )
await asyncio . sleep ( 5 )
@ -260,11 +260,10 @@ class LNWatcher(AddressSynchronizer):
for prevout , spender in spenders . items ( ) :
if spender is not None :
continue
prev_txid , prev_n = prevout . split ( ' : ' )
sweep_txns = self . sweepstore . get_sweep_tx ( funding_outpoint , prev_txid )
sweep_txns = self . sweepstore . get_sweep_tx ( funding_outpoint , prevout )
for tx in sweep_txns :
if not await self . broadcast_or_log ( funding_outpoint , tx ) :
self . logger . info ( f ' { tx . name } could not publish tx: { str ( tx ) } , prev_txid: { prev_txid } ' )
self . logger . info ( f ' { tx . name } could not publish tx: { str ( tx ) } , prevout: { prevout } ' )
async def broadcast_or_log ( self , funding_outpoint , tx ) :
height = self . get_tx_height ( tx . txid ( ) ) . height
@ -280,8 +279,8 @@ class LNWatcher(AddressSynchronizer):
await self . tx_progress [ funding_outpoint ] . tx_queue . put ( tx )
return txid
def add_sweep_tx ( self , funding_outpoint : str , prev_txid : str , tx : str ) :
self . sweepstore . add_sweep_tx ( funding_outpoint , prev_txid , tx )
def add_sweep_tx ( self , funding_outpoint : str , prevout : str , tx : str ) :
self . sweepstore . add_sweep_tx ( funding_outpoint , prevout , tx )
if self . watchtower :
self . watchtower_queue . put_nowait ( funding_outpoint )