Browse Source

Use projection actor to publish information about connected takers

Note: Projection actor is used across both taker and maker, so it really should
    publish information about connected counterparty.
    Taker should use that channel for deriving maker online status.
debug-collab-settlement
Mariusz Klochowicz 3 years ago
parent
commit
1e1add8afd
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 7
      daemon/src/lib.rs
  2. 15
      daemon/src/maker.rs
  3. 25
      daemon/src/maker_cfd.rs
  4. 9
      daemon/src/projection.rs
  5. 9
      daemon/src/taker.rs
  6. 28
      daemon/tests/harness/mod.rs

7
daemon/src/lib.rs

@ -10,7 +10,6 @@ use connection::ConnectionStatus;
use futures::future::RemoteHandle;
use maia::secp256k1_zkp::schnorrsig;
use maker_cfd::TakerDisconnected;
use model::TakerId;
use sqlx::SqlitePool;
use std::future::Future;
use std::time::Duration;
@ -91,7 +90,6 @@ impl Default for Tasks {
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub connected_takers_feed_receiver: watch::Receiver<Vec<TakerId>>,
pub inc_conn_addr: Address<T>,
pub tasks: Tasks,
}
@ -135,9 +133,6 @@ where
let cfds = load_all_cfds(&mut conn).await?;
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(Vec::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
@ -150,7 +145,6 @@ where
settlement_interval,
oracle_pk,
projection_actor,
connected_takers_feed_sender,
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
@ -196,7 +190,6 @@ where
Ok(Self {
cfd_actor_addr,
connected_takers_feed_receiver,
inc_conn_addr,
tasks,
})

15
daemon/src/maker.rs

@ -7,7 +7,7 @@ use daemon::auth::{self, MAKER_USERNAME};
use daemon::bitmex_price_feed::Quote;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::model::{TakerId, WalletInfo};
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
@ -23,7 +23,6 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::Actor;
@ -246,7 +245,6 @@ async fn main() -> Result<()> {
let MakerActorSystem {
cfd_actor_addr,
connected_takers_feed_receiver,
inc_conn_addr: incoming_connection_addr,
tasks: _tasks,
} = MakerActorSystem::new(
@ -279,17 +277,20 @@ async fn main() -> Result<()> {
tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = watch::channel::<Quote>(init_quote);
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(Vec::new());
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let listener_stream = futures::stream::poll_fn(move |ctx| {

25
daemon/src/maker_cfd.rs

@ -6,6 +6,7 @@ use crate::model::cfd::{
};
use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams;
use crate::projection::Update;
use crate::setup_contract::{RolloverParams, SetupParams};
use crate::tokio_ext::FutureExt;
use crate::{
@ -23,7 +24,6 @@ use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::collections::{HashMap, HashSet};
use time::Duration;
use tokio::sync::watch;
use xtra::prelude::*;
pub enum CfdAction {
@ -71,7 +71,6 @@ pub struct Actor<O, M, T, W> {
settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
connected_takers_feed_sender: watch::Sender<Vec<TakerId>>,
takers: Address<T>,
current_order_id: Option<OrderId>,
monitor_actor: Address<M>,
@ -81,7 +80,6 @@ pub struct Actor<O, M, T, W> {
// Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>,
current_agreed_proposals: HashMap<OrderId, (SettlementProposal, TakerId)>,
// TODO: Keep in projection actor
connected_takers: HashSet<TakerId>,
n_payouts: usize,
}
@ -112,7 +110,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
connected_takers_feed_sender: watch::Sender<Vec<TakerId>>,
takers: Address<T>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
@ -124,7 +121,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval,
oracle_pk,
projection_actor,
connected_takers_feed_sender,
takers,
current_order_id: None,
monitor_actor,
@ -287,6 +283,18 @@ impl<O, M, T, W> Actor<O, M, T, W> {
};
Ok((proposal.clone(), *taker_id))
}
async fn update_connected_takers(&mut self) -> Result<()> {
self.projection_actor
.send(Update(
self.connected_takers
.clone()
.into_iter()
.collect::<Vec<TakerId>>(),
))
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
@ -346,9 +354,7 @@ where
if !self.connected_takers.insert(taker_id) {
tracing::warn!("Taker already connected: {:?}", &taker_id);
}
self.connected_takers_feed_sender
.send(self.connected_takers.clone().into_iter().collect())?;
self.update_connected_takers().await?;
Ok(())
}
@ -356,8 +362,7 @@ where
if !self.connected_takers.remove(&taker_id) {
tracing::warn!("Removed unknown taker: {:?}", &taker_id);
}
self.connected_takers_feed_sender
.send(self.connected_takers.clone().into_iter().collect())?;
self.update_connected_takers().await?;
Ok(())
}

9
daemon/src/projection.rs

@ -1,4 +1,5 @@
use crate::bitmex_price_feed::Quote;
use crate::model::TakerId;
use crate::{Cfd, Order, UpdateCfdProposals};
use tokio::sync::watch;
use xtra_productivity::xtra_productivity;
@ -8,6 +9,9 @@ pub struct Actor {
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
tx_connected_takers: watch::Sender<Vec<TakerId>>,
}
impl Actor {
@ -16,12 +20,14 @@ impl Actor {
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
tx_connected_takers: watch::Sender<Vec<TakerId>>,
) -> Self {
Self {
tx_cfds,
tx_order,
tx_quote,
tx_settlements,
tx_connected_takers,
}
}
}
@ -42,6 +48,9 @@ impl Actor {
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx_settlements.send(msg.0);
}
fn handle(&mut self, msg: Update<Vec<TakerId>>) {
let _ = self.tx_connected_takers.send(msg.0);
}
}
impl xtra::Actor for Actor {}

9
daemon/src/taker.rs

@ -7,7 +7,7 @@ use daemon::bitmex_price_feed::Quote;
use daemon::connection::connect;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::model::{TakerId, WalletInfo};
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
@ -264,11 +264,18 @@ async fn main() -> Result<()> {
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
// TODO: Use this channel to convey maker status.
// For now, the receiver is dropped instead of managed by Rocket to
// highlight that we're not using it
let (connected_takers_feed_sender, _connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

28
daemon/tests/harness/mod.rs

@ -20,7 +20,6 @@ use std::task::Poll;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::util::SubscriberInitExt;
@ -59,6 +58,7 @@ pub struct Maker {
pub identity_pk: x25519_dalek::PublicKey,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
connected_takers_feed_receiver: watch::Receiver<Vec<TakerId>>,
_tasks: Tasks,
}
@ -72,7 +72,7 @@ impl Maker {
}
pub fn connected_takers_feed(&mut self) -> &mut watch::Receiver<Vec<TakerId>> {
&mut self.system.connected_takers_feed_receiver
&mut self.connected_takers_feed_receiver
}
pub async fn start(
@ -126,17 +126,20 @@ impl Maker {
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote);
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = watch::channel::<Quote>(dummy_quote);
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let address = listener.local_addr().unwrap();
@ -162,6 +165,7 @@ impl Maker {
_tasks: tasks,
cfd_feed_receiver,
order_feed_receiver,
connected_takers_feed_receiver,
}
}
@ -261,16 +265,20 @@ impl Taker {
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _) = channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote);
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _) = watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = watch::channel::<Quote>(dummy_quote);
let (connected_takers_feed_sender, _connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
tasks.add(connect(

Loading…
Cancel
Save