Browse Source

Tie lifetimes of spawned tasks to actors

rollover-test-2
Mariusz Klochowicz 3 years ago
parent
commit
bea968d27b
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 18
      daemon/src/connection.rs
  2. 8
      daemon/src/maker.rs
  3. 30
      daemon/src/maker_cfd.rs
  4. 8
      daemon/src/taker.rs
  5. 30
      daemon/src/taker_cfd.rs
  6. 22
      daemon/tests/harness/mod.rs

18
daemon/src/connection.rs

@ -15,7 +15,7 @@ use xtra_productivity::xtra_productivity;
struct ConnectedState { struct ConnectedState {
last_heartbeat: SystemTime, last_heartbeat: SystemTime,
_pulse_handle: RemoteHandle<()>, _tasks: Vec<RemoteHandle<()>>,
} }
pub struct Actor { pub struct Actor {
@ -97,22 +97,26 @@ impl Actor {
let send_to_socket = send_to_socket::Actor::new(write, noise.clone()); let send_to_socket = send_to_socket::Actor::new(write, noise.clone());
tokio::spawn(self.send_to_maker_ctx.attach(send_to_socket)); let mut tasks = vec![self
.send_to_maker_ctx
.attach(send_to_socket)
.spawn_with_handle()];
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise)) let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise))
.map(move |item| MakerStreamMessage { item }); .map(move |item| MakerStreamMessage { item });
let this = ctx.address().expect("self to be alive"); let this = ctx.address().expect("self to be alive");
tokio::spawn(this.attach_stream(read)); tasks.push(this.attach_stream(read).spawn_with_handle());
let pulse_remote_handle = ctx tasks.push(
.notify_interval(self.timeout, || MeasurePulse) ctx.notify_interval(self.timeout, || MeasurePulse)
.expect("we just started") .expect("we just started")
.spawn_with_handle(); .spawn_with_handle(),
);
self.connected_state = Some(ConnectedState { self.connected_state = Some(ConnectedState {
last_heartbeat: SystemTime::now(), last_heartbeat: SystemTime::now(),
_pulse_handle: pulse_remote_handle, _tasks: tasks,
}); });
self.status_sender self.status_sender
.send(ConnectionStatus::Online) .send(ConnectionStatus::Online)

8
daemon/src/maker.rs

@ -224,7 +224,7 @@ async fn main() -> Result<()> {
tracing::info!("Listening on {}", local_addr); tracing::info!("Listening on {}", local_addr);
let (task, quote_updates) = bitmex_price_feed::new().await?; let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); let _task = task.spawn_with_handle();
let db = SqlitePool::connect_with( let db = SqlitePool::connect_with(
SqliteConnectOptions::new() SqliteConnectOptions::new()
@ -282,9 +282,11 @@ async fn main() -> Result<()> {
Poll::Ready(Some(message)) Poll::Ready(Some(message))
}); });
tokio::spawn(incoming_connection_addr.attach_stream(listener_stream)); let _listener_task = incoming_connection_addr
.attach_stream(listener_stream)
.spawn_with_handle();
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); let _wallet_sync_task = wallet_sync::new(wallet, wallet_feed_sender).spawn_with_handle();
let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr); let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let new_order_channel = MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr); let new_order_channel = MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr);

30
daemon/src/maker_cfd.rs

@ -8,11 +8,13 @@ use crate::model::cfd::{
}; };
use crate::model::{Price, TakerId, Timestamp, Usd}; use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::tokio_ext::FutureExt;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire}; use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt}; use futures::{future, SinkExt};
use maia::secp256k1_zkp::Signature; use maia::secp256k1_zkp::Signature;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
@ -81,6 +83,7 @@ enum SetupState {
Active { Active {
taker: TakerId, taker: TakerId,
sender: mpsc::UnboundedSender<wire::SetupMsg>, sender: mpsc::UnboundedSender<wire::SetupMsg>,
_task: RemoteHandle<()>,
}, },
None, None,
} }
@ -89,6 +92,7 @@ enum RollOverState {
Active { Active {
taker: TakerId, taker: TakerId,
sender: mpsc::UnboundedSender<wire::RollOverMsg>, sender: mpsc::UnboundedSender<wire::RollOverMsg>,
_task: RemoteHandle<()>,
}, },
None, None,
} }
@ -198,7 +202,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
msg: wire::SetupMsg, msg: wire::SetupMsg,
) -> Result<()> { ) -> Result<()> {
match &mut self.setup_state { match &mut self.setup_state {
SetupState::Active { taker, sender } if taker_id == *taker => { SetupState::Active { taker, sender, .. } if taker_id == *taker => {
sender.send(msg).await?; sender.send(msg).await?;
} }
SetupState::Active { taker, .. } => { SetupState::Active { taker, .. } => {
@ -218,7 +222,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
msg: wire::RollOverMsg, msg: wire::RollOverMsg,
) -> Result<()> { ) -> Result<()> {
match &mut self.roll_over_state { match &mut self.roll_over_state {
RollOverState::Active { taker, sender } if taker_id == *taker => { RollOverState::Active { taker, sender, .. } if taker_id == *taker => {
sender.send(msg).await?; sender.send(msg).await?;
} }
RollOverState::Active { taker, .. } => { RollOverState::Active { taker, .. } => {
@ -622,18 +626,20 @@ where
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
tokio::spawn(async move { let task = async move {
let dlc = contract_future.await; let dlc = contract_future.await;
this.send(CfdSetupCompleted { order_id, dlc }) this.send(CfdSetupCompleted { order_id, dlc })
.await .await
.expect("always connected to ourselves"); .expect("always connected to ourselves");
}); }
.spawn_with_handle();
// 6. Record that we are in an active contract setup // 6. Record that we are in an active contract setup
self.setup_state = SetupState::Active { self.setup_state = SetupState::Active {
sender, sender,
taker: taker_id, taker: taker_id,
_task: task,
}; };
Ok(()) Ok(())
@ -785,18 +791,20 @@ where
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
self.roll_over_state = RollOverState::Active { let task = async move {
sender,
taker: taker_id,
};
tokio::spawn(async move {
let dlc = contract_future.await; let dlc = contract_future.await;
this.send(CfdRollOverCompleted { order_id, dlc }) this.send(CfdRollOverCompleted { order_id, dlc })
.await .await
.expect("always connected to ourselves") .expect("always connected to ourselves")
}); }
.spawn_with_handle();
self.roll_over_state = RollOverState::Active {
sender,
taker: taker_id,
_task: task,
};
self.remove_pending_proposal(&order_id) self.remove_pending_proposal(&order_id)
.context("accepted roll_over")?; .context("accepted roll_over")?;

8
daemon/src/taker.rs

@ -208,7 +208,7 @@ async fn main() -> Result<()> {
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let (task, quote_updates) = bitmex_price_feed::new().await?; let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); let _task = task.spawn_with_handle();
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip())) .merge(("address", opts.http_address.ip()))
@ -258,7 +258,7 @@ async fn main() -> Result<()> {
connect(connection_actor_addr, opts.maker_id, opts.maker).await?; connect(connection_actor_addr, opts.maker_id, opts.maker).await?;
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); let _wallet_sync_task = wallet_sync::new(wallet, wallet_feed_sender).spawn_with_handle();
let take_offer_channel = MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr); let take_offer_channel = MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
let cfd_action_channel = MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr); let cfd_action_channel = MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
@ -290,7 +290,7 @@ async fn main() -> Result<()> {
let shutdown_handle = rocket.shutdown(); let shutdown_handle = rocket.shutdown();
// shutdown the rocket server maker if goes offline // shutdown the rocket server maker if goes offline
tokio::spawn(async move { let _rocket_shutdown_task = (async move {
loop { loop {
maker_online_status_feed_receiver.changed().await.unwrap(); maker_online_status_feed_receiver.changed().await.unwrap();
if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline { if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline {
@ -299,7 +299,7 @@ async fn main() -> Result<()> {
return; return;
} }
} }
}); }).spawn_with_handle();
rocket.launch().await?; rocket.launch().await?;
db.close().await; db.close().await;

30
daemon/src/taker_cfd.rs

@ -7,12 +7,14 @@ use crate::model::cfd::{
}; };
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::tokio_ext::FutureExt;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, setup_contract, wallet, wire}; use crate::{log_error, oracle, setup_contract, wallet, wire};
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt}; use futures::{future, SinkExt};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::watch; use tokio::sync::watch;
@ -49,6 +51,7 @@ pub struct CfdRollOverCompleted {
enum SetupState { enum SetupState {
Active { Active {
sender: mpsc::UnboundedSender<SetupMsg>, sender: mpsc::UnboundedSender<SetupMsg>,
_task: RemoteHandle<()>,
}, },
None, None,
} }
@ -56,6 +59,7 @@ enum SetupState {
enum RollOverState { enum RollOverState {
Active { Active {
sender: mpsc::UnboundedSender<RollOverMsg>, sender: mpsc::UnboundedSender<RollOverMsg>,
_task: RemoteHandle<()>,
}, },
None, None,
} }
@ -252,7 +256,7 @@ where
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> { async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state { match &mut self.setup_state {
SetupState::Active { sender } => { SetupState::Active { sender, .. } => {
sender.send(msg).await?; sender.send(msg).await?;
} }
SetupState::None => { SetupState::None => {
@ -265,7 +269,7 @@ where
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state { match &mut self.roll_over_state {
RollOverState::Active { sender } => { RollOverState::Active { sender, .. } => {
sender.send(msg).await?; sender.send(msg).await?;
} }
RollOverState::None => { RollOverState::None => {
@ -515,15 +519,19 @@ where
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
tokio::spawn(async move { let task = async move {
let dlc = contract_future.await; let dlc = contract_future.await;
this.send(CfdSetupCompleted { order_id, dlc }) this.send(CfdSetupCompleted { order_id, dlc })
.await .await
.expect("always connected to ourselves") .expect("always connected to ourselves")
}); }
.spawn_with_handle();
self.setup_state = SetupState::Active { sender }; self.setup_state = SetupState::Active {
sender,
_task: task,
};
Ok(()) Ok(())
} }
@ -578,15 +586,19 @@ where
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
self.roll_over_state = RollOverState::Active { sender }; let task = async move {
tokio::spawn(async move {
let dlc = contract_future.await; let dlc = contract_future.await;
this.send(CfdRollOverCompleted { order_id, dlc }) this.send(CfdRollOverCompleted { order_id, dlc })
.await .await
.expect("always connected to ourselves") .expect("always connected to ourselves")
}); }
.spawn_with_handle();
self.roll_over_state = RollOverState::Active {
sender,
_task: task,
};
self.remove_pending_proposal(&order_id) self.remove_pending_proposal(&order_id)
.context("Could not remove accepted roll over")?; .context("Could not remove accepted roll over")?;

22
daemon/tests/harness/mod.rs

@ -7,7 +7,9 @@ use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin}; use daemon::model::cfd::{Cfd, Order, Origin};
use daemon::model::{Price, Usd}; use daemon::model::{Price, Usd};
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem}; use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem};
use futures::future::RemoteHandle;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -47,6 +49,7 @@ pub struct Maker {
pub mocks: mocks::Mocks, pub mocks: mocks::Mocks,
pub listen_addr: SocketAddr, pub listen_addr: SocketAddr,
pub identity_pk: x25519_dalek::PublicKey, pub identity_pk: x25519_dalek::PublicKey,
_tasks: Vec<RemoteHandle<()>>,
} }
impl Maker { impl Maker {
@ -64,8 +67,10 @@ impl Maker {
let mut mocks = mocks::Mocks::default(); let mut mocks = mocks::Mocks::default();
let (oracle, monitor, wallet) = mocks::create_actors(&mocks); let (oracle, monitor, wallet) = mocks::create_actors(&mocks);
let mut tasks = vec![];
let (wallet_addr, wallet_fut) = wallet.create(None).run(); let (wallet_addr, wallet_fut) = wallet.create(None).run();
tokio::spawn(wallet_fut); tasks.push(wallet_fut.spawn_with_handle());
let settlement_time_interval_hours = time::Duration::hours(24); let settlement_time_interval_hours = time::Duration::hours(24);
@ -109,13 +114,20 @@ impl Maker {
Poll::Ready(Some(message)) Poll::Ready(Some(message))
}); });
tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream)); tasks.push(
maker
.inc_conn_addr
.clone()
.attach_stream(listener_stream)
.spawn_with_handle(),
);
Self { Self {
system: maker, system: maker,
identity_pk, identity_pk,
listen_addr: address, listen_addr: address,
mocks, mocks,
_tasks: tasks,
} }
} }
@ -153,6 +165,7 @@ impl Maker {
pub struct Taker { pub struct Taker {
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>, pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub mocks: mocks::Mocks, pub mocks: mocks::Mocks,
_tasks: Vec<RemoteHandle<()>>,
} }
impl Taker { impl Taker {
@ -182,8 +195,10 @@ impl Taker {
let mut mocks = mocks::Mocks::default(); let mut mocks = mocks::Mocks::default();
let (oracle, monitor, wallet) = mocks::create_actors(&mocks); let (oracle, monitor, wallet) = mocks::create_actors(&mocks);
let mut tasks = vec![];
let (wallet_addr, wallet_fut) = wallet.create(None).run(); let (wallet_addr, wallet_fut) = wallet.create(None).run();
tokio::spawn(wallet_fut); tasks.push(wallet_fut.spawn_with_handle());
// system startup sends sync messages, mock them // system startup sends sync messages, mock them
mocks.mock_sync_handlers().await; mocks.mock_sync_handlers().await;
@ -213,6 +228,7 @@ impl Taker {
Self { Self {
system: taker, system: taker,
mocks, mocks,
_tasks: tasks,
} }
} }

Loading…
Cancel
Save