From 9cf09b209e3f0d25f17412205978870887821dab Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 11 Oct 2021 14:33:06 +1100 Subject: [PATCH 1/3] Fix monitoring In this protocol we *can* actually have transactions with the same script, hence, we have to handle the `GetHistoryRes` by `Txid`. To make this easier to understand we group the `Vec>` by `Txid` into `HashMap`. Note: The assumption is, that only `GetHistroyRes` is returned for the `Txid`+`Script` combination. --- daemon/src/monitor.rs | 114 +++++++++++++++++++++++++++++++++--------- 1 file changed, 91 insertions(+), 23 deletions(-) diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 447f463..64ab529 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -267,12 +267,37 @@ where self.awaiting_status.len() ); + let txid_to_script = self + .awaiting_status + .keys() + .cloned() + .collect::>(); + let histories = self .client .batch_script_get_history(self.awaiting_status.keys().map(|(_, script)| script)) .context("Failed to get script histories")?; - self.update_state(latest_block_height, histories).await?; + let mut histories_grouped_by_txid = HashMap::new(); + for history in histories { + for response in history { + let txid = response.tx_hash; + let script = match txid_to_script.get(&txid) { + None => { + tracing::error!( + "Could not find script in own state for txid {}, ignoring", + txid + ); + continue; + } + Some(script) => script, + }; + histories_grouped_by_txid.insert((txid, script.clone()), response); + } + } + + self.update_state(latest_block_height, histories_grouped_by_txid) + .await?; Ok(()) } @@ -288,7 +313,7 @@ where async fn update_state( &mut self, latest_block_height: BlockHeight, - histories: Vec>, + histories: HashMap<(Txid, Script), GetHistoryRes>, ) -> Result<()> { if latest_block_height > self.latest_block_height { tracing::debug!( @@ -298,30 +323,29 @@ where self.latest_block_height = latest_block_height; } - // 1. shape response into local data format - let new_status = histories.into_iter().zip(self.awaiting_status.keys().cloned()).map(|(script_history, (txid, script))| { - let new_script_status = match script_history.as_slice() { - [] => ScriptStatus::Unseen, - [remaining @ .., last] => { - if !remaining.is_empty() { - tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored") - } - - if last.height <= 0 { - ScriptStatus::InMempool - } else { - ScriptStatus::Confirmed( - Confirmed::from_inclusion_and_latest_block( - u32::try_from(last.height).expect("we checked that height is > 0"), + // 1. Decide new status based on script history + let new_status = self + .awaiting_status + .iter() + .map(|(key, _old_status)| { + let new_script_status = match histories.get(key) { + None => ScriptStatus::Unseen, + Some(history_entry) => { + if history_entry.height <= 0 { + ScriptStatus::InMempool + } else { + ScriptStatus::Confirmed(Confirmed::from_inclusion_and_latest_block( + u32::try_from(history_entry.height) + .expect("we checked that height is > 0"), u32::from(self.latest_block_height), - ), - ) + )) + } } - } - }; + }; - ((txid, script), new_script_status) - }).collect::>(); + (key.clone(), new_script_status) + }) + .collect::>(); // 2. log any changes since our last sync for ((txid, script), status) in new_status.iter() { @@ -660,6 +684,44 @@ mod tests { assert_eq!(recorder.events[1], refund_expired); } + #[tokio::test] + async fn update_for_a_script_only_results_in_event_for_corresponding_transaction() { + let _guard = tracing_subscriber::fmt() + .with_env_filter("trace") + .with_test_writer() + .set_default(); + + let (recorder_address, mut recorder_context) = + xtra::Context::::new(None); + let mut recorder = MessageRecordingActor::default(); + + let cet_finality = Event::CetFinality(OrderId::default()); + let refund_finality = Event::RefundFinality(OrderId::default()); + + let mut monitor = Actor::for_test( + recorder_address, + [ + ( + (txid1(), script1()), + vec![(ScriptStatus::finality(), cet_finality.clone())], + ), + ( + (txid2(), script1()), + vec![(ScriptStatus::finality(), refund_finality.clone())], + ), + ], + ); + monitor.client.include_tx(txid1(), 5); + + recorder_context + .handle_while(&mut recorder, monitor.sync()) + .await + .unwrap(); + + assert!(recorder.events.contains(&cet_finality)); + assert!(!recorder.events.contains(&refund_finality)); + } + impl Actor where A: xtra::Actor + xtra::Handler, @@ -686,6 +748,12 @@ mod tests { .unwrap() } + fn txid2() -> Txid { + "07ade6a49e34ad4cc3ca3f79d78df462685f8f1fbc8a9b05af51ec503ea5b960" + .parse() + .unwrap() + } + fn script1() -> Script { "6a4c50001d97ca0002d3829148f63cc8ee21241e3f1c5eaee58781dd45a7d814710fac571b92aadff583e85d5a295f61856f469b401efe615657bf040c32f1000065bce011a420ca9ea3657fff154d95d1a95c".parse().unwrap() } From 179952ce5b3817f19e5bb178790c1da9142ca365 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 11 Oct 2021 16:26:34 +1100 Subject: [PATCH 2/3] Cleanup `awaiting_status` in case we don't have remaining subscriptions We always replaced it with `remaining`, but when populating `awaiting_status` upon sync we only go by the keys, so we actually fill in the script status again. We should remove the complete entry if we want to make sure that we don't want to process it anymore. Note: Kept seeing monitoring logs for things that were already past Finality (...) so we looked into this. --- daemon/src/monitor.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 64ab529..c046839 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -383,7 +383,11 @@ where tracing::trace!("{} subscriptions reached their monitoring target, {} remaining for this script", reached_monitoring_target.len(), remaining.len()); - occupied.insert(remaining); + if remaining.is_empty() { + occupied.remove(); + } else { + occupied.insert(remaining); + } for (target_status, event) in reached_monitoring_target { tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target"); @@ -722,6 +726,37 @@ mod tests { assert!(!recorder.events.contains(&refund_finality)); } + #[tokio::test] + async fn stop_monitoring_after_target_reached() { + let _guard = tracing_subscriber::fmt() + .with_env_filter("trace") + .with_test_writer() + .set_default(); + + let (recorder_address, mut recorder_context) = + xtra::Context::::new(None); + let mut recorder = MessageRecordingActor::default(); + + let cet_finality = Event::CetFinality(OrderId::default()); + + let mut monitor = Actor::for_test( + recorder_address, + [( + (txid1(), script1()), + vec![(ScriptStatus::finality(), cet_finality.clone())], + )], + ); + monitor.client.include_tx(txid1(), 5); + + recorder_context + .handle_while(&mut recorder, monitor.sync()) + .await + .unwrap(); + + assert!(recorder.events.contains(&cet_finality)); + assert!(monitor.awaiting_status.is_empty()); + } + impl Actor where A: xtra::Actor + xtra::Handler, From 8d0390156598dc0d38eee5d34e47daf8bc74f616 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 11 Oct 2021 17:18:11 +1100 Subject: [PATCH 3/3] Avoid spamming the logs and add Todo We keep seeing this in the logs, after a final tx went in: ``` 2021-10-11 16:58:18 ERROR Could not find script in own state for txid b542cbc1920bdd926d79fb17377eda7152ed944ec64a1b09ce68dd11d9e7467e, ignoring ``` This happens because multiple transactions have the same script (e.g. CET, refund). Once the CET is final we remove it from monitoring, which results in the txid not being available when matching. --- daemon/src/monitor.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index c046839..24e6670 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -284,7 +284,7 @@ where let txid = response.tx_hash; let script = match txid_to_script.get(&txid) { None => { - tracing::error!( + tracing::trace!( "Could not find script in own state for txid {}, ignoring", txid ); @@ -383,6 +383,12 @@ where tracing::trace!("{} subscriptions reached their monitoring target, {} remaining for this script", reached_monitoring_target.len(), remaining.len()); + // TODO: When reaching finality of a final tx (CET, refund_tx, + // collaborate_close_tx) we have to remove the remaining "competing" + // transactions. This is not critical, but when fetching + // `GetHistoryRes` by script we can have entries that we don't care about + // anymore. + if remaining.is_empty() { occupied.remove(); } else {