diff --git a/Cargo.lock b/Cargo.lock index 9418fa3..52782a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3841,7 +3841,7 @@ dependencies = [ [[package]] name = "xtra_productivity" version = "0.1.0" -source = "git+https://github.com/comit-network/xtra-productivity#ee789b130f2d20b38f57434fe6ed18c4fdf1db4a" +source = "git+https://github.com/comit-network/xtra-productivity#52492681bb260a5789162e70cb5067994f051360" dependencies = [ "quote", "syn", diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index 4f569a7..d0d5115 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,6 +1,7 @@ use crate::model::{Price, Timestamp}; -use crate::{projection, Tasks}; +use crate::{projection, supervisor, Tasks}; use anyhow::Result; +use async_trait::async_trait; use futures::{SinkExt, TryStreamExt}; use rust_decimal::Decimal; use std::convert::TryFrom; @@ -14,49 +15,43 @@ const URL: &str = "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD"; pub struct Actor { tasks: Tasks, receiver: Box>>, + supervisor: xtra::Address>, } impl Actor { - pub fn new(receiver: impl MessageChannel> + 'static) -> Self { + pub fn new( + receiver: impl MessageChannel> + 'static, + supervisor: xtra::Address>, + ) -> Self { Self { tasks: Tasks::default(), receiver: Box::new(receiver), + supervisor, } } } -impl xtra::Actor for Actor {} - -#[xtra_productivity] -impl Actor { - async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context) { - match msg { - NotifyNoConnection::Failed { error } => { - tracing::warn!("Connection to BitMex realtime API failed: {}", error) - } - NotifyNoConnection::StreamEnded => { - tracing::warn!("Connection to BitMex realtime API closed") - } - } - - let this = ctx.address().expect("we are alive"); - - self.tasks.add(connect_until_successful(this)); - } - - async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result<()> { - tracing::debug!("Connecting to BitMex realtime API"); +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + self.tasks.add({ + let this = ctx.address().expect("we are alive"); + let receiver = self.receiver.clone_channel(); - let (mut connection, _) = tokio_tungstenite::connect_async(URL).await?; + async move { + tracing::debug!("Connecting to BitMex realtime API"); - tracing::info!("Connected to BitMex realtime API"); + let mut connection = match tokio_tungstenite::connect_async(URL).await { + Ok((connection, _)) => connection, + Err(e) => { + let _ = this.send(StopReason::FailedToConnect { source: e }).await; + return + } + }; - let this = ctx.address().expect("we are alive"); + tracing::info!("Connected to BitMex realtime API"); - self.tasks.add({ - let receiver = self.receiver.clone_channel(); - async move { - let no_connection = loop { + loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => { tracing::trace!("No message from BitMex in the last 5 seconds, pinging"); @@ -89,38 +84,40 @@ impl Actor { continue; } Ok(None) => { - break NotifyNoConnection::StreamEnded + let _ = this.send(StopReason::StreamEnded).await; + return; } Err(e) => { - break NotifyNoConnection::Failed { error: e } + let _ = this.send(StopReason::Failed { source: e }).await; + return; } } }, } - }; - - let _ = this.send(no_connection).await; + } } }); - - Ok(()) } } -async fn connect_until_successful(this: xtra::Address) { - while let Err(e) = this - .send(Connect) - .await - .expect("always connected to ourselves") - { - tracing::warn!("Failed to connect to BitMex realtime API: {:#}", e); +#[xtra_productivity] +impl Actor { + async fn handle(&mut self, msg: StopReason, ctx: &mut xtra::Context) { + let _ = self + .supervisor + .send(supervisor::Stopped { reason: msg }) + .await; + ctx.stop(); } } -pub struct Connect; - -enum NotifyNoConnection { - Failed { error: tungstenite::Error }, +#[derive(thiserror::Error, Debug)] +pub enum StopReason { + #[error("Connection to BitMex API failed")] + Failed { source: tungstenite::Error }, + #[error("Failed to connect to BitMex API")] + FailedToConnect { source: tungstenite::Error }, + #[error("Websocket stream to BitMex API closed")] StreamEnded, } diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index e0c5d97..6f03456 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -49,6 +49,7 @@ pub mod send_to_socket; pub mod setup_contract; pub mod setup_maker; pub mod setup_taker; +pub mod supervisor; pub mod taker_cfd; pub mod to_sse_event; pub mod tokio_ext; diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 378a5d0..a70aa1d 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -8,7 +8,7 @@ use daemon::model::cfd::Role; use daemon::seed::Seed; use daemon::{ bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle, - projection, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, + projection, supervisor, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, }; use sqlx::sqlite::SqliteConnectOptions; @@ -257,14 +257,13 @@ async fn main() -> Result<()> { ) .await?; - let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor) - .create(None) - .run(); - tasks.add(task); + let (supervisor, _price_feed) = supervisor::Actor::new( + move |supervisor| bitmex_price_feed::Actor::new(projection_actor.clone(), supervisor), + |_| true, // always restart price feed actor + ); - price_feed_address - .send(bitmex_price_feed::Connect) - .await??; + let (_supervisor_address, task) = supervisor.create(None).run(); + tasks.add(task); let (proj_actor, projection_feeds) = projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?; diff --git a/daemon/src/supervisor.rs b/daemon/src/supervisor.rs new file mode 100644 index 0000000..71783d3 --- /dev/null +++ b/daemon/src/supervisor.rs @@ -0,0 +1,90 @@ +use crate::Tasks; +use async_trait::async_trait; +use std::fmt; +use xtra::{Address, Context}; +use xtra_productivity::xtra_productivity; + +/// A supervising actor reacts to messages from the actor it is supervising and restarts it based on +/// a given policy. +pub struct Actor { + context: Context, + ctor: Box) -> T + Send + 'static>, + tasks: Tasks, + restart_policy: Box bool + Send + 'static>, +} + +impl Actor +where + T: xtra::Actor, + R: fmt::Display + 'static, +{ + /// Construct a new supervisor. + /// + /// The supervisor needs to know two things: + /// 1. How to construct an instance of the actor. + /// 2. When to construct an instance of the actor. + pub fn new( + ctor: impl (Fn(Address) -> T) + Send + 'static, + restart_policy: impl (FnMut(R) -> bool) + Send + 'static, + ) -> (Self, Address) { + let (address, context) = Context::new(None); + + let supervisor = Self { + context, + ctor: Box::new(ctor), + tasks: Tasks::default(), + restart_policy: Box::new(restart_policy), + }; + + (supervisor, address) + } + + fn spawn_new(&mut self, ctx: &mut Context) { + tracing::info!("Spawning new instance of actor"); // TODO: Include name + + let this = ctx.address().expect("we are alive"); + let actor = (self.ctor)(this); + + self.tasks.add(self.context.attach(actor)); + } +} + +#[async_trait] +impl xtra::Actor for Actor +where + T: xtra::Actor, + R: fmt::Display + 'static, +{ + async fn started(&mut self, ctx: &mut Context) { + self.spawn_new(ctx); + } +} + +#[xtra_productivity] +impl Actor +where + T: xtra::Actor, + R: fmt::Display + 'static, +{ + pub fn handle(&mut self, msg: Stopped, ctx: &mut Context) { + let reason = msg.reason; + + tracing::info!("Actor stopped: {}", reason); // TODO: Include name of actor + + let should_restart = (self.restart_policy)(reason); + + tracing::debug!("Restart actor? {}", should_restart); + + if should_restart { + self.spawn_new(ctx) + } + } +} + +/// Tell the supervisor that the actor was stopped. +/// +/// The given `reason` will be passed to the `restart_policy` configured in the supervisor. If it +/// yields `true`, a new instance of the actor will be spawned. +pub struct Stopped { + pub reason: R, +} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index cba7380..78851f9 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -8,7 +8,7 @@ use daemon::model::cfd::Role; use daemon::model::Identity; use daemon::seed::Seed; use daemon::{ - bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet, + bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, supervisor, wallet, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, }; use sqlx::sqlite::SqliteConnectOptions; @@ -241,14 +241,13 @@ async fn main() -> Result<()> { ) .await?; - let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor) - .create(None) - .run(); - tasks.add(task); + let (supervisor, _price_feed) = supervisor::Actor::new( + move |supervisor| bitmex_price_feed::Actor::new(projection_actor.clone(), supervisor), + |_| true, // always restart price feed actor + ); - price_feed_address - .send(bitmex_price_feed::Connect) - .await??; + let (_supervisor_address, task) = supervisor.create(None).run(); + tasks.add(task); let (proj_actor, projection_feeds) = projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?;