Browse Source

Introduce a supervisor actor that can restart another actor

feature/supervised-connection
Thomas Eizinger 3 years ago
parent
commit
bc92cf1fc2
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 2
      Cargo.lock
  2. 93
      daemon/src/bitmex_price_feed.rs
  3. 1
      daemon/src/lib.rs
  4. 15
      daemon/src/maker.rs
  5. 90
      daemon/src/supervisor.rs
  6. 15
      daemon/src/taker.rs

2
Cargo.lock

@ -3841,7 +3841,7 @@ dependencies = [
[[package]]
name = "xtra_productivity"
version = "0.1.0"
source = "git+https://github.com/comit-network/xtra-productivity#ee789b130f2d20b38f57434fe6ed18c4fdf1db4a"
source = "git+https://github.com/comit-network/xtra-productivity#52492681bb260a5789162e70cb5067994f051360"
dependencies = [
"quote",
"syn",

93
daemon/src/bitmex_price_feed.rs

@ -1,6 +1,7 @@
use crate::model::{Price, Timestamp};
use crate::{projection, Tasks};
use crate::{projection, supervisor, Tasks};
use anyhow::Result;
use async_trait::async_trait;
use futures::{SinkExt, TryStreamExt};
use rust_decimal::Decimal;
use std::convert::TryFrom;
@ -14,49 +15,43 @@ const URL: &str = "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD";
pub struct Actor {
tasks: Tasks,
receiver: Box<dyn MessageChannel<projection::Update<Quote>>>,
supervisor: xtra::Address<supervisor::Actor<Self, StopReason>>,
}
impl Actor {
pub fn new(receiver: impl MessageChannel<projection::Update<Quote>> + 'static) -> Self {
pub fn new(
receiver: impl MessageChannel<projection::Update<Quote>> + 'static,
supervisor: xtra::Address<supervisor::Actor<Self, StopReason>>,
) -> Self {
Self {
tasks: Tasks::default(),
receiver: Box::new(receiver),
supervisor,
}
}
}
impl xtra::Actor for Actor {}
#[xtra_productivity]
impl Actor {
async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context<Self>) {
match msg {
NotifyNoConnection::Failed { error } => {
tracing::warn!("Connection to BitMex realtime API failed: {}", error)
}
NotifyNoConnection::StreamEnded => {
tracing::warn!("Connection to BitMex realtime API closed")
}
}
let this = ctx.address().expect("we are alive");
self.tasks.add(connect_until_successful(this));
}
async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context<Self>) -> Result<()> {
tracing::debug!("Connecting to BitMex realtime API");
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
self.tasks.add({
let this = ctx.address().expect("we are alive");
let receiver = self.receiver.clone_channel();
let (mut connection, _) = tokio_tungstenite::connect_async(URL).await?;
async move {
tracing::debug!("Connecting to BitMex realtime API");
tracing::info!("Connected to BitMex realtime API");
let mut connection = match tokio_tungstenite::connect_async(URL).await {
Ok((connection, _)) => connection,
Err(e) => {
let _ = this.send(StopReason::FailedToConnect { source: e }).await;
return
}
};
let this = ctx.address().expect("we are alive");
tracing::info!("Connected to BitMex realtime API");
self.tasks.add({
let receiver = self.receiver.clone_channel();
async move {
let no_connection = loop {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {
tracing::trace!("No message from BitMex in the last 5 seconds, pinging");
@ -89,38 +84,40 @@ impl Actor {
continue;
}
Ok(None) => {
break NotifyNoConnection::StreamEnded
let _ = this.send(StopReason::StreamEnded).await;
return;
}
Err(e) => {
break NotifyNoConnection::Failed { error: e }
let _ = this.send(StopReason::Failed { source: e }).await;
return;
}
}
},
}
};
let _ = this.send(no_connection).await;
}
}
});
Ok(())
}
}
async fn connect_until_successful(this: xtra::Address<Actor>) {
while let Err(e) = this
.send(Connect)
.await
.expect("always connected to ourselves")
{
tracing::warn!("Failed to connect to BitMex realtime API: {:#}", e);
#[xtra_productivity]
impl Actor {
async fn handle(&mut self, msg: StopReason, ctx: &mut xtra::Context<Self>) {
let _ = self
.supervisor
.send(supervisor::Stopped { reason: msg })
.await;
ctx.stop();
}
}
pub struct Connect;
enum NotifyNoConnection {
Failed { error: tungstenite::Error },
#[derive(thiserror::Error, Debug)]
pub enum StopReason {
#[error("Connection to BitMex API failed")]
Failed { source: tungstenite::Error },
#[error("Failed to connect to BitMex API")]
FailedToConnect { source: tungstenite::Error },
#[error("Websocket stream to BitMex API closed")]
StreamEnded,
}

1
daemon/src/lib.rs

@ -49,6 +49,7 @@ pub mod send_to_socket;
pub mod setup_contract;
pub mod setup_maker;
pub mod setup_taker;
pub mod supervisor;
pub mod taker_cfd;
pub mod to_sse_event;
pub mod tokio_ext;

15
daemon/src/maker.rs

@ -8,7 +8,7 @@ use daemon::model::cfd::Role;
use daemon::seed::Seed;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle,
projection, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
projection, supervisor, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
SETTLEMENT_INTERVAL,
};
use sqlx::sqlite::SqliteConnectOptions;
@ -257,14 +257,13 @@ async fn main() -> Result<()> {
)
.await?;
let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor)
.create(None)
.run();
tasks.add(task);
let (supervisor, _price_feed) = supervisor::Actor::new(
move |supervisor| bitmex_price_feed::Actor::new(projection_actor.clone(), supervisor),
|_| true, // always restart price feed actor
);
price_feed_address
.send(bitmex_price_feed::Connect)
.await??;
let (_supervisor_address, task) = supervisor.create(None).run();
tasks.add(task);
let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?;

90
daemon/src/supervisor.rs

@ -0,0 +1,90 @@
use crate::Tasks;
use async_trait::async_trait;
use std::fmt;
use xtra::{Address, Context};
use xtra_productivity::xtra_productivity;
/// A supervising actor reacts to messages from the actor it is supervising and restarts it based on
/// a given policy.
pub struct Actor<T, R> {
context: Context<T>,
ctor: Box<dyn Fn(Address<Self>) -> T + Send + 'static>,
tasks: Tasks,
restart_policy: Box<dyn FnMut(R) -> bool + Send + 'static>,
}
impl<T, R> Actor<T, R>
where
T: xtra::Actor,
R: fmt::Display + 'static,
{
/// Construct a new supervisor.
///
/// The supervisor needs to know two things:
/// 1. How to construct an instance of the actor.
/// 2. When to construct an instance of the actor.
pub fn new(
ctor: impl (Fn(Address<Self>) -> T) + Send + 'static,
restart_policy: impl (FnMut(R) -> bool) + Send + 'static,
) -> (Self, Address<T>) {
let (address, context) = Context::new(None);
let supervisor = Self {
context,
ctor: Box::new(ctor),
tasks: Tasks::default(),
restart_policy: Box::new(restart_policy),
};
(supervisor, address)
}
fn spawn_new(&mut self, ctx: &mut Context<Self>) {
tracing::info!("Spawning new instance of actor"); // TODO: Include name
let this = ctx.address().expect("we are alive");
let actor = (self.ctor)(this);
self.tasks.add(self.context.attach(actor));
}
}
#[async_trait]
impl<T, R> xtra::Actor for Actor<T, R>
where
T: xtra::Actor,
R: fmt::Display + 'static,
{
async fn started(&mut self, ctx: &mut Context<Self>) {
self.spawn_new(ctx);
}
}
#[xtra_productivity]
impl<T, R> Actor<T, R>
where
T: xtra::Actor,
R: fmt::Display + 'static,
{
pub fn handle(&mut self, msg: Stopped<R>, ctx: &mut Context<Self>) {
let reason = msg.reason;
tracing::info!("Actor stopped: {}", reason); // TODO: Include name of actor
let should_restart = (self.restart_policy)(reason);
tracing::debug!("Restart actor? {}", should_restart);
if should_restart {
self.spawn_new(ctx)
}
}
}
/// Tell the supervisor that the actor was stopped.
///
/// The given `reason` will be passed to the `restart_policy` configured in the supervisor. If it
/// yields `true`, a new instance of the actor will be spawned.
pub struct Stopped<R> {
pub reason: R,
}

15
daemon/src/taker.rs

@ -8,7 +8,7 @@ use daemon::model::cfd::Role;
use daemon::model::Identity;
use daemon::seed::Seed;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet,
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, supervisor, wallet,
TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL,
};
use sqlx::sqlite::SqliteConnectOptions;
@ -241,14 +241,13 @@ async fn main() -> Result<()> {
)
.await?;
let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor)
.create(None)
.run();
tasks.add(task);
let (supervisor, _price_feed) = supervisor::Actor::new(
move |supervisor| bitmex_price_feed::Actor::new(projection_actor.clone(), supervisor),
|_| true, // always restart price feed actor
);
price_feed_address
.send(bitmex_price_feed::Connect)
.await??;
let (_supervisor_address, task) = supervisor.create(None).run();
tasks.add(task);
let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?;

Loading…
Cancel
Save