@ -267,12 +267,37 @@ where
self . awaiting_status . len ( )
self . awaiting_status . len ( )
) ;
) ;
let txid_to_script = self
. awaiting_status
. keys ( )
. cloned ( )
. collect ::< HashMap < _ , _ > > ( ) ;
let histories = self
let histories = self
. client
. client
. batch_script_get_history ( self . awaiting_status . keys ( ) . map ( | ( _ , script ) | script ) )
. batch_script_get_history ( self . awaiting_status . keys ( ) . map ( | ( _ , script ) | script ) )
. context ( "Failed to get script histories" ) ? ;
. context ( "Failed to get script histories" ) ? ;
self . update_state ( latest_block_height , histories ) . await ? ;
let mut histories_grouped_by_txid = HashMap ::new ( ) ;
for history in histories {
for response in history {
let txid = response . tx_hash ;
let script = match txid_to_script . get ( & txid ) {
None = > {
tracing ::trace ! (
"Could not find script in own state for txid {}, ignoring" ,
txid
) ;
continue ;
}
Some ( script ) = > script ,
} ;
histories_grouped_by_txid . insert ( ( txid , script . clone ( ) ) , response ) ;
}
}
self . update_state ( latest_block_height , histories_grouped_by_txid )
. await ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
@ -288,7 +313,7 @@ where
async fn update_state (
async fn update_state (
& mut self ,
& mut self ,
latest_block_height : BlockHeight ,
latest_block_height : BlockHeight ,
histories : Vec < Vec < GetHistoryRes > > ,
histories : HashMap < ( Txid , Script ) , GetHistoryRes > ,
) -> Result < ( ) > {
) -> Result < ( ) > {
if latest_block_height > self . latest_block_height {
if latest_block_height > self . latest_block_height {
tracing ::debug ! (
tracing ::debug ! (
@ -298,30 +323,29 @@ where
self . latest_block_height = latest_block_height ;
self . latest_block_height = latest_block_height ;
}
}
// 1. shape response into local data format
// 1. Decide new status based on script history
let new_status = histories . into_iter ( ) . zip ( self . awaiting_status . keys ( ) . cloned ( ) ) . map ( | ( script_history , ( txid , script ) ) | {
let new_status = self
let new_script_status = match script_history . as_slice ( ) {
. awaiting_status
[ ] = > ScriptStatus ::Unseen ,
. iter ( )
[ remaining @ . . , last ] = > {
. map ( | ( key , _old_status ) | {
if ! remaining . is_empty ( ) {
let new_script_status = match histories . get ( key ) {
tracing ::warn ! ( "Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored" )
None = > ScriptStatus ::Unseen ,
}
Some ( history_entry ) = > {
if history_entry . height < = 0 {
if last . height < = 0 {
ScriptStatus ::InMempool
ScriptStatus ::InMempool
} else {
} else {
ScriptStatus ::Confirmed ( Confirmed ::from_inclusion_and_latest_block (
ScriptStatus ::Confirmed (
u32 ::try_from ( history_entry . height )
Confirmed ::from_inclusion_and_latest_block (
. expect ( "we checked that height is > 0" ) ,
u32 ::try_from ( last . height ) . expect ( "we checked that height is > 0" ) ,
u32 ::from ( self . latest_block_height ) ,
u32 ::from ( self . latest_block_height ) ,
) ,
) )
)
}
}
}
}
} ;
} ;
( ( txid , script ) , new_script_status )
( key . clone ( ) , new_script_status )
} ) . collect ::< BTreeMap < _ , _ > > ( ) ;
} )
. collect ::< BTreeMap < _ , _ > > ( ) ;
// 2. log any changes since our last sync
// 2. log any changes since our last sync
for ( ( txid , script ) , status ) in new_status . iter ( ) {
for ( ( txid , script ) , status ) in new_status . iter ( ) {
@ -359,7 +383,17 @@ where
tracing ::trace ! ( "{} subscriptions reached their monitoring target, {} remaining for this script" , reached_monitoring_target . len ( ) , remaining . len ( ) ) ;
tracing ::trace ! ( "{} subscriptions reached their monitoring target, {} remaining for this script" , reached_monitoring_target . len ( ) , remaining . len ( ) ) ;
occupied . insert ( remaining ) ;
// TODO: When reaching finality of a final tx (CET, refund_tx,
// collaborate_close_tx) we have to remove the remaining "competing"
// transactions. This is not critical, but when fetching
// `GetHistoryRes` by script we can have entries that we don't care about
// anymore.
if remaining . is_empty ( ) {
occupied . remove ( ) ;
} else {
occupied . insert ( remaining ) ;
}
for ( target_status , event ) in reached_monitoring_target {
for ( target_status , event ) in reached_monitoring_target {
tracing ::info ! ( % txid , target = % target_status , current = % status , "Bitcoin transaction reached monitoring target" ) ;
tracing ::info ! ( % txid , target = % target_status , current = % status , "Bitcoin transaction reached monitoring target" ) ;
@ -660,6 +694,75 @@ mod tests {
assert_eq ! ( recorder . events [ 1 ] , refund_expired ) ;
assert_eq ! ( recorder . events [ 1 ] , refund_expired ) ;
}
}
#[ tokio::test ]
async fn update_for_a_script_only_results_in_event_for_corresponding_transaction ( ) {
let _guard = tracing_subscriber ::fmt ( )
. with_env_filter ( "trace" )
. with_test_writer ( )
. set_default ( ) ;
let ( recorder_address , mut recorder_context ) =
xtra ::Context ::< MessageRecordingActor > ::new ( None ) ;
let mut recorder = MessageRecordingActor ::default ( ) ;
let cet_finality = Event ::CetFinality ( OrderId ::default ( ) ) ;
let refund_finality = Event ::RefundFinality ( OrderId ::default ( ) ) ;
let mut monitor = Actor ::for_test (
recorder_address ,
[
(
( txid1 ( ) , script1 ( ) ) ,
vec ! [ ( ScriptStatus ::finality ( ) , cet_finality . clone ( ) ) ] ,
) ,
(
( txid2 ( ) , script1 ( ) ) ,
vec ! [ ( ScriptStatus ::finality ( ) , refund_finality . clone ( ) ) ] ,
) ,
] ,
) ;
monitor . client . include_tx ( txid1 ( ) , 5 ) ;
recorder_context
. handle_while ( & mut recorder , monitor . sync ( ) )
. await
. unwrap ( ) ;
assert ! ( recorder . events . contains ( & cet_finality ) ) ;
assert ! ( ! recorder . events . contains ( & refund_finality ) ) ;
}
#[ tokio::test ]
async fn stop_monitoring_after_target_reached ( ) {
let _guard = tracing_subscriber ::fmt ( )
. with_env_filter ( "trace" )
. with_test_writer ( )
. set_default ( ) ;
let ( recorder_address , mut recorder_context ) =
xtra ::Context ::< MessageRecordingActor > ::new ( None ) ;
let mut recorder = MessageRecordingActor ::default ( ) ;
let cet_finality = Event ::CetFinality ( OrderId ::default ( ) ) ;
let mut monitor = Actor ::for_test (
recorder_address ,
[ (
( txid1 ( ) , script1 ( ) ) ,
vec ! [ ( ScriptStatus ::finality ( ) , cet_finality . clone ( ) ) ] ,
) ] ,
) ;
monitor . client . include_tx ( txid1 ( ) , 5 ) ;
recorder_context
. handle_while ( & mut recorder , monitor . sync ( ) )
. await
. unwrap ( ) ;
assert ! ( recorder . events . contains ( & cet_finality ) ) ;
assert ! ( monitor . awaiting_status . is_empty ( ) ) ;
}
impl < A > Actor < A , stub ::Client >
impl < A > Actor < A , stub ::Client >
where
where
A : xtra ::Actor + xtra ::Handler < Event > ,
A : xtra ::Actor + xtra ::Handler < Event > ,
@ -686,6 +789,12 @@ mod tests {
. unwrap ( )
. unwrap ( )
}
}
fn txid2 ( ) -> Txid {
"07ade6a49e34ad4cc3ca3f79d78df462685f8f1fbc8a9b05af51ec503ea5b960"
. parse ( )
. unwrap ( )
}
fn script1 ( ) -> Script {
fn script1 ( ) -> Script {
"6a4c50001d97ca0002d3829148f63cc8ee21241e3f1c5eaee58781dd45a7d814710fac571b92aadff583e85d5a295f61856f469b401efe615657bf040c32f1000065bce011a420ca9ea3657fff154d95d1a95c" . parse ( ) . unwrap ( )
"6a4c50001d97ca0002d3829148f63cc8ee21241e3f1c5eaee58781dd45a7d814710fac571b92aadff583e85d5a295f61856f469b401efe615657bf040c32f1000065bce011a420ca9ea3657fff154d95d1a95c" . parse ( ) . unwrap ( )
}
}