diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 447f463..24e6670 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -267,12 +267,37 @@ where self.awaiting_status.len() ); + let txid_to_script = self + .awaiting_status + .keys() + .cloned() + .collect::>(); + let histories = self .client .batch_script_get_history(self.awaiting_status.keys().map(|(_, script)| script)) .context("Failed to get script histories")?; - self.update_state(latest_block_height, histories).await?; + let mut histories_grouped_by_txid = HashMap::new(); + for history in histories { + for response in history { + let txid = response.tx_hash; + let script = match txid_to_script.get(&txid) { + None => { + tracing::trace!( + "Could not find script in own state for txid {}, ignoring", + txid + ); + continue; + } + Some(script) => script, + }; + histories_grouped_by_txid.insert((txid, script.clone()), response); + } + } + + self.update_state(latest_block_height, histories_grouped_by_txid) + .await?; Ok(()) } @@ -288,7 +313,7 @@ where async fn update_state( &mut self, latest_block_height: BlockHeight, - histories: Vec>, + histories: HashMap<(Txid, Script), GetHistoryRes>, ) -> Result<()> { if latest_block_height > self.latest_block_height { tracing::debug!( @@ -298,30 +323,29 @@ where self.latest_block_height = latest_block_height; } - // 1. shape response into local data format - let new_status = histories.into_iter().zip(self.awaiting_status.keys().cloned()).map(|(script_history, (txid, script))| { - let new_script_status = match script_history.as_slice() { - [] => ScriptStatus::Unseen, - [remaining @ .., last] => { - if !remaining.is_empty() { - tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored") - } - - if last.height <= 0 { - ScriptStatus::InMempool - } else { - ScriptStatus::Confirmed( - Confirmed::from_inclusion_and_latest_block( - u32::try_from(last.height).expect("we checked that height is > 0"), + // 1. Decide new status based on script history + let new_status = self + .awaiting_status + .iter() + .map(|(key, _old_status)| { + let new_script_status = match histories.get(key) { + None => ScriptStatus::Unseen, + Some(history_entry) => { + if history_entry.height <= 0 { + ScriptStatus::InMempool + } else { + ScriptStatus::Confirmed(Confirmed::from_inclusion_and_latest_block( + u32::try_from(history_entry.height) + .expect("we checked that height is > 0"), u32::from(self.latest_block_height), - ), - ) + )) + } } - } - }; + }; - ((txid, script), new_script_status) - }).collect::>(); + (key.clone(), new_script_status) + }) + .collect::>(); // 2. log any changes since our last sync for ((txid, script), status) in new_status.iter() { @@ -359,7 +383,17 @@ where tracing::trace!("{} subscriptions reached their monitoring target, {} remaining for this script", reached_monitoring_target.len(), remaining.len()); - occupied.insert(remaining); + // TODO: When reaching finality of a final tx (CET, refund_tx, + // collaborate_close_tx) we have to remove the remaining "competing" + // transactions. This is not critical, but when fetching + // `GetHistoryRes` by script we can have entries that we don't care about + // anymore. + + if remaining.is_empty() { + occupied.remove(); + } else { + occupied.insert(remaining); + } for (target_status, event) in reached_monitoring_target { tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target"); @@ -660,6 +694,75 @@ mod tests { assert_eq!(recorder.events[1], refund_expired); } + #[tokio::test] + async fn update_for_a_script_only_results_in_event_for_corresponding_transaction() { + let _guard = tracing_subscriber::fmt() + .with_env_filter("trace") + .with_test_writer() + .set_default(); + + let (recorder_address, mut recorder_context) = + xtra::Context::::new(None); + let mut recorder = MessageRecordingActor::default(); + + let cet_finality = Event::CetFinality(OrderId::default()); + let refund_finality = Event::RefundFinality(OrderId::default()); + + let mut monitor = Actor::for_test( + recorder_address, + [ + ( + (txid1(), script1()), + vec![(ScriptStatus::finality(), cet_finality.clone())], + ), + ( + (txid2(), script1()), + vec![(ScriptStatus::finality(), refund_finality.clone())], + ), + ], + ); + monitor.client.include_tx(txid1(), 5); + + recorder_context + .handle_while(&mut recorder, monitor.sync()) + .await + .unwrap(); + + assert!(recorder.events.contains(&cet_finality)); + assert!(!recorder.events.contains(&refund_finality)); + } + + #[tokio::test] + async fn stop_monitoring_after_target_reached() { + let _guard = tracing_subscriber::fmt() + .with_env_filter("trace") + .with_test_writer() + .set_default(); + + let (recorder_address, mut recorder_context) = + xtra::Context::::new(None); + let mut recorder = MessageRecordingActor::default(); + + let cet_finality = Event::CetFinality(OrderId::default()); + + let mut monitor = Actor::for_test( + recorder_address, + [( + (txid1(), script1()), + vec![(ScriptStatus::finality(), cet_finality.clone())], + )], + ); + monitor.client.include_tx(txid1(), 5); + + recorder_context + .handle_while(&mut recorder, monitor.sync()) + .await + .unwrap(); + + assert!(recorder.events.contains(&cet_finality)); + assert!(monitor.awaiting_status.is_empty()); + } + impl Actor where A: xtra::Actor + xtra::Handler, @@ -686,6 +789,12 @@ mod tests { .unwrap() } + fn txid2() -> Txid { + "07ade6a49e34ad4cc3ca3f79d78df462685f8f1fbc8a9b05af51ec503ea5b960" + .parse() + .unwrap() + } + fn script1() -> Script { "6a4c50001d97ca0002d3829148f63cc8ee21241e3f1c5eaee58781dd45a7d814710fac571b92aadff583e85d5a295f61856f469b401efe615657bf040c32f1000065bce011a420ca9ea3657fff154d95d1a95c".parse().unwrap() }