Browse Source

Merge #607

607: Don't use tokio::spawn r=klochowicz a=klochowicz

see commit descriptions for more info

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
rollover-test-2
bors[bot] 3 years ago
committed by GitHub
parent
commit
753fa92005
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      Cargo.lock
  2. 2
      daemon/Cargo.toml
  3. 3
      daemon/clippy.toml
  4. 21
      daemon/src/connection.rs
  5. 105
      daemon/src/lib.rs
  6. 10
      daemon/src/maker.rs
  7. 30
      daemon/src/maker_cfd.rs
  8. 36
      daemon/src/maker_inc_connections.rs
  9. 10
      daemon/src/taker.rs
  10. 30
      daemon/src/taker_cfd.rs
  11. 48
      daemon/src/tokio_ext.rs
  12. 16
      daemon/tests/harness/mod.rs

1
Cargo.lock

@ -3826,7 +3826,6 @@ dependencies = [
"futures-timer",
"futures-util",
"pollster",
"tokio",
]
[[package]]

2
daemon/Cargo.toml

@ -45,7 +45,7 @@ tracing = { version = "0.1" }
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
x25519-dalek = { version = "1.1" }
xtra = { version = "0.6", features = ["with-tokio-1"] }
xtra = { version = "0.6" }
xtra_productivity = { version = "0.1.0" }
[[bin]]

3
daemon/clippy.toml

@ -0,0 +1,3 @@
disallowed-methods = [
"tokio::spawn", # tasks can outlive the actor system, prefer spawn_with_handle()
]

21
daemon/src/connection.rs

@ -1,7 +1,5 @@
use crate::tokio_ext::FutureExt;
use crate::{log_error, noise, send_to_socket, wire};
use crate::{log_error, noise, send_to_socket, wire, Tasks};
use anyhow::Result;
use futures::future::RemoteHandle;
use futures::StreamExt;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
@ -15,7 +13,7 @@ use xtra_productivity::xtra_productivity;
struct ConnectedState {
last_heartbeat: SystemTime,
_pulse_handle: RemoteHandle<()>,
_tasks: Tasks,
}
pub struct Actor {
@ -97,22 +95,23 @@ impl Actor {
let send_to_socket = send_to_socket::Actor::new(write, noise.clone());
tokio::spawn(self.send_to_maker_ctx.attach(send_to_socket));
let mut tasks = Tasks::default();
tasks.add(self.send_to_maker_ctx.attach(send_to_socket));
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise))
.map(move |item| MakerStreamMessage { item });
let this = ctx.address().expect("self to be alive");
tokio::spawn(this.attach_stream(read));
tasks.add(this.attach_stream(read));
let pulse_remote_handle = ctx
.notify_interval(self.timeout, || MeasurePulse)
.expect("we just started")
.spawn_with_handle();
tasks.add(
ctx.notify_interval(self.timeout, || MeasurePulse)
.expect("we just started"),
);
self.connected_state = Some(ConnectedState {
last_heartbeat: SystemTime::now(),
_pulse_handle: pulse_remote_handle,
_tasks: tasks,
});
self.status_sender
.send(ConnectionStatus::Online)

105
daemon/src/lib.rs

@ -1,6 +1,5 @@
#![cfg_attr(not(test), warn(clippy::unwrap_used))]
#![warn(clippy::disallowed_method)]
use crate::db::load_all_cfds;
use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Cfd, Order, UpdateCfdProposals};
@ -58,6 +57,22 @@ pub const N_PAYOUTS: usize = 200;
/// If it gets dropped, all tasks are cancelled.
pub struct Tasks(Vec<RemoteHandle<()>>);
impl Tasks {
/// Spawn the task on the runtime and remembers the handle
/// NOTE: Do *not* call spawn_with_handle() before calling `add`,
/// such calls will trigger panic in debug mode.
pub fn add(&mut self, f: impl Future<Output = ()> + Send + 'static) {
let handle = f.spawn_with_handle();
self.0.push(handle);
}
}
impl Default for Tasks {
fn default() -> Self {
Tasks(vec![])
}
}
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
@ -113,7 +128,7 @@ where
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let mut tasks = vec![];
let mut tasks = Tasks::default();
let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new(
db,
@ -131,46 +146,35 @@ where
.create(None)
.run();
tasks.push(cfd_actor_fut.spawn_with_handle());
tasks.add(cfd_actor_fut);
tasks.push(
inc_conn_ctx
.run(inc_conn_constructor(
Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()),
))
.spawn_with_handle(),
);
tasks.add(inc_conn_ctx.run(inc_conn_constructor(
Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()),
)));
tasks.push(
tasks.add(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?
.spawn_with_handle(),
.map_err(|e| anyhow::anyhow!(e))?,
);
tasks.push(
tasks.add(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?)
.spawn_with_handle(),
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tasks.push(
tasks.add(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?
.spawn_with_handle(),
.map_err(|e| anyhow::anyhow!(e))?,
);
let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.run();
tasks.push(fan_out_actor_fut.spawn_with_handle());
tasks.add(fan_out_actor_fut);
tasks.push(
oracle_ctx
.run(oracle_constructor(cfds, Box::new(fan_out_actor)))
.spawn_with_handle(),
);
tasks.add(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
oracle_addr.send(oracle::Sync).await?;
@ -182,7 +186,7 @@ where
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr,
tasks: Tasks(tasks),
tasks,
})
}
}
@ -240,7 +244,7 @@ where
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let mut tasks = vec![];
let mut tasks = Tasks::default();
let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None);
let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new(
@ -258,36 +262,29 @@ where
.create(None)
.run();
tasks.push(cfd_actor_fut.spawn_with_handle());
tasks.push(
connection_actor_ctx
.run(connection::Actor::new(
maker_online_status_feed_sender,
Box::new(cfd_actor_addr.clone()),
identity_sk,
maker_heartbeat_interval,
))
.spawn_with_handle(),
);
tasks.add(cfd_actor_fut);
tasks.push(
tasks.add(connection_actor_ctx.run(connection::Actor::new(
maker_online_status_feed_sender,
Box::new(cfd_actor_addr.clone()),
identity_sk,
maker_heartbeat_interval,
)));
tasks.add(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?
.spawn_with_handle(),
.map_err(|e| anyhow::anyhow!(e))?,
);
tasks.push(
tasks.add(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?)
.spawn_with_handle(),
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tasks.push(
tasks.add(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?
.spawn_with_handle(),
.map_err(|e| anyhow::anyhow!(e))?,
);
let (fan_out_actor, fan_out_actor_fut) =
@ -295,13 +292,9 @@ where
.create(None)
.run();
tasks.push(fan_out_actor_fut.spawn_with_handle());
tasks.add(fan_out_actor_fut);
tasks.push(
oracle_ctx
.run(oracle_constructor(cfds, Box::new(fan_out_actor)))
.spawn_with_handle(),
);
tasks.add(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
tracing::debug!("Taker actor system ready");
@ -312,7 +305,7 @@ where
order_feed_receiver,
update_cfd_feed_receiver,
maker_online_status_feed_receiver,
tasks: Tasks(tasks),
tasks,
})
}
}

10
daemon/src/maker.rs

@ -9,7 +9,7 @@ use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
wallet, wallet_sync, MakerActorSystem, HEARTBEAT_INTERVAL, N_PAYOUTS,
wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
@ -223,8 +223,10 @@ async fn main() -> Result<()> {
tracing::info!("Listening on {}", local_addr);
let mut tasks = Tasks::default();
let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task);
tasks.add(task);
let db = SqlitePool::connect_with(
SqliteConnectOptions::new()
@ -282,9 +284,9 @@ async fn main() -> Result<()> {
Poll::Ready(Some(message))
});
tokio::spawn(incoming_connection_addr.attach_stream(listener_stream));
tasks.add(incoming_connection_addr.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tasks.add(wallet_sync::new(wallet, wallet_feed_sender));
let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let new_order_channel = MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr);

30
daemon/src/maker_cfd.rs

@ -8,11 +8,13 @@ use crate::model::cfd::{
};
use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams;
use crate::tokio_ext::FutureExt;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use maia::secp256k1_zkp::Signature;
use sqlx::pool::PoolConnection;
@ -81,6 +83,7 @@ enum SetupState {
Active {
taker: TakerId,
sender: mpsc::UnboundedSender<wire::SetupMsg>,
_task: RemoteHandle<()>,
},
None,
}
@ -89,6 +92,7 @@ enum RollOverState {
Active {
taker: TakerId,
sender: mpsc::UnboundedSender<wire::RollOverMsg>,
_task: RemoteHandle<()>,
},
None,
}
@ -198,7 +202,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
msg: wire::SetupMsg,
) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { taker, sender } if taker_id == *taker => {
SetupState::Active { taker, sender, .. } if taker_id == *taker => {
sender.send(msg).await?;
}
SetupState::Active { taker, .. } => {
@ -218,7 +222,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
msg: wire::RollOverMsg,
) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { taker, sender } if taker_id == *taker => {
RollOverState::Active { taker, sender, .. } if taker_id == *taker => {
sender.send(msg).await?;
}
RollOverState::Active { taker, .. } => {
@ -622,18 +626,20 @@ where
.address()
.expect("actor to be able to give address to itself");
tokio::spawn(async move {
let task = async move {
let dlc = contract_future.await;
this.send(CfdSetupCompleted { order_id, dlc })
.await
.expect("always connected to ourselves");
});
}
.spawn_with_handle();
// 6. Record that we are in an active contract setup
self.setup_state = SetupState::Active {
sender,
taker: taker_id,
_task: task,
};
Ok(())
@ -785,18 +791,20 @@ where
.address()
.expect("actor to be able to give address to itself");
self.roll_over_state = RollOverState::Active {
sender,
taker: taker_id,
};
tokio::spawn(async move {
let task = async move {
let dlc = contract_future.await;
this.send(CfdRollOverCompleted { order_id, dlc })
.await
.expect("always connected to ourselves")
});
}
.spawn_with_handle();
self.roll_over_state = RollOverState::Active {
sender,
taker: taker_id,
_task: task,
};
self.remove_pending_proposal(&order_id)
.context("accepted roll_over")?;

36
daemon/src/maker_inc_connections.rs

@ -3,9 +3,8 @@ use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::noise::TransportStateExt;
use crate::tokio_ext::FutureExt;
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire};
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, Tasks};
use anyhow::Result;
use futures::future::RemoteHandle;
use futures::{StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::io;
@ -72,7 +71,7 @@ pub struct Actor {
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration,
tasks: Vec<RemoteHandle<()>>,
tasks: Tasks,
}
impl Actor {
@ -88,7 +87,7 @@ impl Actor {
taker_msg_channel: taker_msg_channel.clone_channel(),
noise_priv_key,
heartbeat_interval,
tasks: Vec::new(),
tasks: Tasks::default(),
}
}
@ -136,29 +135,26 @@ impl Actor {
forward_only_ok::Actor::new(self.taker_msg_channel.clone_channel())
.create(None)
.run();
self.tasks.push(forward_to_cfd_fut.spawn_with_handle());
self.tasks.add(forward_to_cfd_fut);
// only allow outgoing messages while we are successfully reading incoming ones
let heartbeat_interval = self.heartbeat_interval;
self.tasks.push(
async move {
let mut actor = send_to_socket::Actor::new(write, transport_state.clone());
self.tasks.add(async move {
let mut actor = send_to_socket::Actor::new(write, transport_state.clone());
let _heartbeat_handle = out_msg_actor_context
.notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat)
.expect("actor not to shutdown")
.spawn_with_handle();
let _heartbeat_handle = out_msg_actor_context
.notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat)
.expect("actor not to shutdown")
.spawn_with_handle();
out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read))
.await;
out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read))
.await;
tracing::error!("Closing connection to taker {}", taker_id);
tracing::error!("Closing connection to taker {}", taker_id);
actor.shutdown().await;
}
.spawn_with_handle(),
);
actor.shutdown().await;
});
self.write_connections
.insert(taker_id, out_msg_actor_address);

10
daemon/src/taker.rs

@ -9,7 +9,7 @@ use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, taker_cfd, wallet,
wallet_sync, TakerActorSystem, HEARTBEAT_INTERVAL, N_PAYOUTS,
wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
@ -207,8 +207,10 @@ async fn main() -> Result<()> {
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let mut tasks = Tasks::default();
let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task);
tasks.add(task);
let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip()))
@ -258,7 +260,7 @@ async fn main() -> Result<()> {
connect(connection_actor_addr, opts.maker_id, opts.maker).await?;
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tasks.add(wallet_sync::new(wallet, wallet_feed_sender));
let take_offer_channel = MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
let cfd_action_channel = MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
@ -290,7 +292,7 @@ async fn main() -> Result<()> {
let shutdown_handle = rocket.shutdown();
// shutdown the rocket server maker if goes offline
tokio::spawn(async move {
tasks.add(async move {
loop {
maker_online_status_feed_receiver.changed().await.unwrap();
if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline {

30
daemon/src/taker_cfd.rs

@ -7,12 +7,14 @@ use crate::model::cfd::{
};
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams};
use crate::tokio_ext::FutureExt;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, setup_contract, wallet, wire};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use std::collections::HashMap;
use tokio::sync::watch;
@ -49,6 +51,7 @@ pub struct CfdRollOverCompleted {
enum SetupState {
Active {
sender: mpsc::UnboundedSender<SetupMsg>,
_task: RemoteHandle<()>,
},
None,
}
@ -56,6 +59,7 @@ enum SetupState {
enum RollOverState {
Active {
sender: mpsc::UnboundedSender<RollOverMsg>,
_task: RemoteHandle<()>,
},
None,
}
@ -252,7 +256,7 @@ where
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender } => {
SetupState::Active { sender, .. } => {
sender.send(msg).await?;
}
SetupState::None => {
@ -265,7 +269,7 @@ where
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender } => {
RollOverState::Active { sender, .. } => {
sender.send(msg).await?;
}
RollOverState::None => {
@ -515,15 +519,19 @@ where
.address()
.expect("actor to be able to give address to itself");
tokio::spawn(async move {
let task = async move {
let dlc = contract_future.await;
this.send(CfdSetupCompleted { order_id, dlc })
.await
.expect("always connected to ourselves")
});
}
.spawn_with_handle();
self.setup_state = SetupState::Active { sender };
self.setup_state = SetupState::Active {
sender,
_task: task,
};
Ok(())
}
@ -578,15 +586,19 @@ where
.address()
.expect("actor to be able to give address to itself");
self.roll_over_state = RollOverState::Active { sender };
tokio::spawn(async move {
let task = async move {
let dlc = contract_future.await;
this.send(CfdRollOverCompleted { order_id, dlc })
.await
.expect("always connected to ourselves")
});
}
.spawn_with_handle();
self.roll_over_state = RollOverState::Active {
sender,
_task: task,
};
self.remove_pending_proposal(&order_id)
.context("Could not remove accepted roll over")?;

48
daemon/src/tokio_ext.rs

@ -1,5 +1,6 @@
use futures::future::RemoteHandle;
use futures::FutureExt as _;
use std::any::{Any, TypeId};
use std::fmt;
use std::future::Future;
use std::time::Duration;
@ -10,6 +11,8 @@ where
F: Future<Output = Result<(), E>> + Send + 'static,
E: fmt::Display,
{
// we want to disallow calls to tokio::spawn outside FutureExt
#[allow(clippy::disallowed_method)]
tokio::spawn(async move {
if let Err(e) = future.await {
tracing::warn!("Task failed: {:#}", e);
@ -24,7 +27,7 @@ pub trait FutureExt: Future + Sized {
/// The task will be stopped when the handle gets dropped.
fn spawn_with_handle(self) -> RemoteHandle<Self::Output>
where
Self: Future<Output = ()> + Send + 'static;
Self: Future<Output = ()> + Send + Any + 'static;
}
impl<F> FutureExt for F
@ -37,10 +40,51 @@ where
fn spawn_with_handle(self) -> RemoteHandle<()>
where
Self: Future<Output = ()> + Send + 'static,
Self: Future<Output = ()> + Send + Any + 'static,
{
debug_assert!(
TypeId::of::<RemoteHandle<()>>() != self.type_id(),
"RemoteHandle<()> is a handle to already spawned task",
);
let (future, handle) = self.remote_handle();
// we want to disallow calls to tokio::spawn outside FutureExt
#[allow(clippy::disallowed_method)]
tokio::spawn(future);
handle
}
}
#[cfg(test)]
mod tests {
use std::panic;
use tokio::time::sleep;
use super::*;
#[tokio::test]
async fn spawning_a_regular_future_does_not_panic() {
let result = panic::catch_unwind(|| {
let _handle = sleep(Duration::from_secs(2)).spawn_with_handle();
});
assert!(result.is_ok());
}
#[tokio::test]
async fn panics_when_called_spawn_with_handle_on_remote_handle() {
let result = panic::catch_unwind(|| {
let handle = sleep(Duration::from_secs(2)).spawn_with_handle();
let _handle_to_a_handle = handle.spawn_with_handle();
});
if cfg!(debug_assertions) {
assert!(
result.is_err(),
"Spawning a remote handle into a separate task should panic_in_debug_mode"
);
} else {
assert!(result.is_ok(), "Do not panic in release mode");
}
}
}

16
daemon/tests/harness/mod.rs

@ -7,7 +7,7 @@ use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin};
use daemon::model::{Price, Usd};
use daemon::seed::Seed;
use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem};
use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem, Tasks};
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use std::net::SocketAddr;
@ -47,6 +47,7 @@ pub struct Maker {
pub mocks: mocks::Mocks,
pub listen_addr: SocketAddr,
pub identity_pk: x25519_dalek::PublicKey,
_tasks: Tasks,
}
impl Maker {
@ -64,8 +65,10 @@ impl Maker {
let mut mocks = mocks::Mocks::default();
let (oracle, monitor, wallet) = mocks::create_actors(&mocks);
let mut tasks = Tasks::default();
let (wallet_addr, wallet_fut) = wallet.create(None).run();
tokio::spawn(wallet_fut);
tasks.add(wallet_fut);
let settlement_time_interval_hours = time::Duration::hours(24);
@ -109,13 +112,14 @@ impl Maker {
Poll::Ready(Some(message))
});
tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream));
tasks.add(maker.inc_conn_addr.clone().attach_stream(listener_stream));
Self {
system: maker,
identity_pk,
listen_addr: address,
mocks,
_tasks: tasks,
}
}
@ -153,6 +157,7 @@ impl Maker {
pub struct Taker {
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub mocks: mocks::Mocks,
_tasks: Tasks,
}
impl Taker {
@ -182,8 +187,10 @@ impl Taker {
let mut mocks = mocks::Mocks::default();
let (oracle, monitor, wallet) = mocks::create_actors(&mocks);
let mut tasks = Tasks::default();
let (wallet_addr, wallet_fut) = wallet.create(None).run();
tokio::spawn(wallet_fut);
tasks.add(wallet_fut);
// system startup sends sync messages, mock them
mocks.mock_sync_handlers().await;
@ -213,6 +220,7 @@ impl Taker {
Self {
system: taker,
mocks,
_tasks: tasks,
}
}

Loading…
Cancel
Save