Browse Source

Introduce projection actor

The projection actor is responsible in preparing data for the HTTP API and consequently for the UI.
While this is commit provides only the foundation, in the long run we can:

- Reduce the logic happening in the rocket layer. `ToSseEvent` can likely go away.
- Reduce the complexity for other actors in what needs to be updated when. All they should care about is sending updates to the projection actor. (mostly done)
- Improve test coverage. With a dedicated actor that does the projection, we should be able to write assertions in our integration tests that are closer to the UI.
reconnect-to-maker
bonomat 3 years ago
parent
commit
bf36f290dd
No known key found for this signature in database GPG Key ID: E5F8E74C672BC666
  1. 12
      daemon/src/bitmex_price_feed.rs
  2. 35
      daemon/src/cfd_actors.rs
  3. 4
      daemon/src/db.rs
  4. 34
      daemon/src/lib.rs
  5. 36
      daemon/src/maker.rs
  6. 83
      daemon/src/maker_cfd.rs
  7. 47
      daemon/src/projection.rs
  8. 36
      daemon/src/taker.rs
  9. 74
      daemon/src/taker_cfd.rs
  10. 70
      daemon/tests/harness/mod.rs

12
daemon/src/bitmex_price_feed.rs

@ -1,15 +1,18 @@
use crate::model::{Price, Timestamp};
use crate::projection;
use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use rust_decimal::Decimal;
use std::convert::TryFrom;
use std::future::Future;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite;
use xtra::prelude::MessageChannel;
/// Connects to the BitMex price feed, returning the polling task and a watch channel that will
/// always hold the last quote.
pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)> {
pub async fn new(
msg_channel: impl MessageChannel<projection::Update<Quote>>,
) -> Result<(impl Future<Output = ()>, Quote)> {
let (connection, _) = tokio_tungstenite::connect_async(
"wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD",
)
@ -24,11 +27,10 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
tracing::info!("Connected to BitMex realtime API");
let first_quote = quotes.select_next_some().await?;
let (sender, receiver) = watch::channel(first_quote);
let task = async move {
while let Ok(Some(quote)) = quotes.try_next().await {
if sender.send(quote).is_err() {
if msg_channel.send(projection::Update(quote)).await.is_err() {
break; // If the receiver dies, we can exit the loop.
}
}
@ -36,7 +38,7 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
tracing::warn!("Failed to read quote from websocket");
};
Ok((task, receiver))
Ok((task, first_quote))
}
#[derive(Clone, Debug)]

35
daemon/src/cfd_actors.rs

@ -1,27 +1,30 @@
use crate::model::cfd::{Attestation, Cfd, CfdState, OrderId};
use crate::{db, monitor, oracle, try_continue, wallet};
use crate::{db, monitor, oracle, projection, try_continue, wallet};
use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use tokio::sync::watch;
pub async fn insert_cfd_and_send_to_feed(
cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> {
db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
projection_address
.send(projection::Update(db::load_all_cfds(conn).await?))
.await?;
Ok(())
}
pub async fn append_cfd_state(
cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> {
db::append_cfd_state(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
projection_address
.send(projection::Update(db::load_all_cfds(conn).await?))
.await?;
Ok(())
}
@ -29,7 +32,7 @@ pub async fn try_cet_publication<W>(
cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -46,7 +49,7 @@ where
bail!("If we can get the CET we should be able to transition")
}
append_cfd_state(cfd, conn, update_sender).await?;
append_cfd_state(cfd, conn, projection_address).await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
@ -61,7 +64,7 @@ pub async fn handle_monitoring_event<W>(
event: monitor::Event,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -76,10 +79,10 @@ where
return Ok(());
}
append_cfd_state(&cfd, conn, update_sender).await?;
append_cfd_state(&cfd, conn, projection_address).await?;
if let CfdState::OpenCommitted { .. } = cfd.state {
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
try_cet_publication(&mut cfd, conn, wallet, projection_address).await?;
} else if let CfdState::PendingRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet
@ -98,7 +101,7 @@ pub async fn handle_commit<W>(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -118,7 +121,7 @@ where
bail!("If we can get the commit tx we should be able to transition")
}
append_cfd_state(&cfd, conn, update_sender).await?;
append_cfd_state(&cfd, conn, projection_address).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
@ -128,7 +131,7 @@ pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -161,8 +164,8 @@ where
continue;
}
try_continue!(append_cfd_state(cfd, conn, update_sender).await);
try_continue!(try_cet_publication(cfd, conn, wallet, update_sender)
try_continue!(append_cfd_state(cfd, conn, projection_address).await);
try_continue!(try_cet_publication(cfd, conn, wallet, projection_address)
.await
.context("Error when trying to publish CET"));
}

4
daemon/src/db.rs

@ -529,16 +529,14 @@ pub async fn load_cfds_by_oracle_event_id(
#[cfg(test)]
mod tests {
use crate::cfd_actors;
use pretty_assertions::assert_eq;
use rand::Rng;
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use time::macros::datetime;
use time::OffsetDateTime;
use tokio::sync::watch;
use crate::db::{self, insert_order};
use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, Order, Origin};
use crate::model::{Price, Usd};

34
daemon/src/lib.rs

@ -10,7 +10,6 @@ use connection::ConnectionStatus;
use futures::future::RemoteHandle;
use maia::secp256k1_zkp::schnorrsig;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;
use tokio::sync::watch;
@ -36,6 +35,7 @@ mod noise;
pub mod olivia;
pub mod oracle;
pub mod payout_curve;
pub mod projection;
pub mod routes;
pub mod seed;
pub mod send_to_socket;
@ -75,9 +75,6 @@ impl Default for Tasks {
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub inc_conn_addr: Address<T>,
pub tasks: Tasks,
}
@ -111,6 +108,7 @@ where
) -> T,
settlement_time_interval_hours: time::Duration,
n_payouts: usize,
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
@ -119,11 +117,6 @@ where
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
@ -135,9 +128,7 @@ where
wallet_addr,
settlement_time_interval_hours,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
projection_actor,
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
@ -182,9 +173,6 @@ where
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr,
tasks,
})
@ -194,9 +182,6 @@ where
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub connection_actor_addr: Address<connection::Actor>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>,
pub tasks: Tasks,
}
@ -225,6 +210,7 @@ where
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
n_payouts: usize,
maker_heartbeat_interval: Duration,
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
@ -233,11 +219,6 @@ where
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (maker_online_status_feed_sender, maker_online_status_feed_receiver) =
watch::channel(ConnectionStatus::Offline);
@ -251,9 +232,7 @@ where
db,
wallet_addr,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
projection_actor,
Box::new(connection_actor_addr.clone()),
monitor_addr.clone(),
oracle_addr,
@ -301,9 +280,6 @@ where
Ok(Self {
cfd_actor_addr,
connection_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
maker_online_status_feed_receiver,
tasks,
})

36
daemon/src/maker.rs

@ -4,20 +4,25 @@ use bdk::bitcoin::Amount;
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::auth::{self, MAKER_USERNAME};
use daemon::bitmex_price_feed::Quote;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::Actor;
@ -225,9 +230,6 @@ async fn main() -> Result<()> {
let mut tasks = Tasks::default();
let (task, quote_updates) = bitmex_price_feed::new().await?;
tasks.add(task);
let db = SqlitePool::connect_with(
SqliteConnectOptions::new()
.create_if_missing(true)
@ -247,11 +249,11 @@ async fn main() -> Result<()> {
let settlement_time_interval_hours =
time::Duration::hours(opts.settlement_time_interval_hours as i64);
let (projection_actor, projection_context) = xtra::Context::new(None);
let MakerActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr: incoming_connection_addr,
tasks: _tasks,
} = MakerActorSystem::new(
@ -270,9 +272,27 @@ async fn main() -> Result<()> {
},
time::Duration::hours(opts.settlement_time_interval_hours as i64),
N_PAYOUTS,
projection_actor.clone(),
)
.await?;
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
@ -299,7 +319,7 @@ async fn main() -> Result<()> {
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(auth_password)
.manage(quote_updates)
.manage(quote_receiver)
.manage(bitcoin_network)
.mount(
"/api",

83
daemon/src/maker_cfd.rs

@ -2,12 +2,15 @@ use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
};
use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams;
use crate::tokio_ext::FutureExt;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire};
use crate::{
log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire,
UpdateCfdProposals,
};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -19,7 +22,6 @@ use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::collections::HashMap;
use time::Duration;
use tokio::sync::watch;
use xtra::prelude::*;
pub enum CfdAction {
@ -62,9 +64,7 @@ pub struct Actor<O, M, T, W> {
wallet: Address<W>,
settlement_time_interval_hours: Duration,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
takers: Address<T>,
current_order_id: Option<OrderId>,
monitor_actor: Address<M>,
@ -102,9 +102,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
wallet: Address<W>,
settlement_time_interval_hours: Duration,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
takers: Address<T>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
@ -115,9 +113,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
wallet,
settlement_time_interval_hours,
oracle_pk,
cfd_feed_actor_inbox,
order_feed_sender,
update_cfd_feed_sender,
projection_actor,
takers,
current_order_id: None,
monitor_actor,
@ -165,7 +161,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
taker_id,
),
);
self.send_pending_proposals()?;
self.send_pending_proposals().await?;
Ok(())
}
@ -189,7 +185,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
taker_id,
),
);
self.send_pending_proposals()?;
self.send_pending_proposals().await?;
Ok(())
}
@ -235,21 +231,25 @@ impl<O, M, T, W> Actor<O, M, T, W> {
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals(&self) -> Result<()> {
Ok(self.update_cfd_feed_sender.send(
self.current_pending_proposals
async fn send_pending_proposals(&self) -> Result<()> {
let pending_proposal = self
.current_pending_proposals
.iter()
.map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone())))
.collect(),
)?)
.collect::<UpdateCfdProposals>();
let _ = self
.projection_actor
.send(projection::Update(pending_proposal))
.await?;
Ok(())
}
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
}
self.send_pending_proposals()?;
self.send_pending_proposals().await?;
Ok(())
}
@ -283,12 +283,7 @@ where
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
@ -296,12 +291,7 @@ where
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
@ -312,7 +302,7 @@ where
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
&self.projection_actor,
)
.await?;
Ok(())
@ -361,11 +351,13 @@ where
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
Err(e) => {
tracing::warn!("Failed to notify taker of accepted settlement: {}", e);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
}
@ -381,6 +373,7 @@ where
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
self.takers
@ -413,6 +406,7 @@ where
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected roll_over")?;
self.takers
@ -472,7 +466,7 @@ where
self.takers
.send(maker_inc_connections::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
self.projection_actor.send(projection::Update(None)).await?;
// 3. Insert CFD in DB
let cfd = Cfd::new(
@ -485,7 +479,7 @@ where
taker_id,
},
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. check if order has acceptable amounts and if not reject the cfd
// Since rejection is tied to the cfd state at the moment we can only do this after creating
@ -533,7 +527,7 @@ where
) -> Result<()> {
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.takers
.send(maker_inc_connections::TakerMessage {
@ -598,7 +592,7 @@ where
// 4. Insert that we are in contract setup and refresh our own feed
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
// 5. Spawn away the contract setup
let (sender, receiver) = mpsc::unbounded();
@ -665,7 +659,7 @@ where
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
@ -677,7 +671,7 @@ where
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
@ -802,6 +796,7 @@ where
};
self.remove_pending_proposal(&order_id)
.await
.context("accepted roll_over")?;
Ok(())
}
@ -827,7 +822,7 @@ where
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
@ -887,7 +882,7 @@ where
own_script_pubkey.clone(),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
@ -976,7 +971,9 @@ where
self.current_order_id.replace(order.id);
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
self.projection_actor
.send(projection::Update(Some(order.clone())))
.await?;
// 4. Inform connected takers
self.takers

47
daemon/src/projection.rs

@ -0,0 +1,47 @@
use crate::bitmex_price_feed::Quote;
use crate::{Cfd, Order, UpdateCfdProposals};
use tokio::sync::watch;
use xtra_productivity::xtra_productivity;
pub struct Actor {
tx_cfds: watch::Sender<Vec<Cfd>>,
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
}
impl Actor {
pub fn new(
tx_cfds: watch::Sender<Vec<Cfd>>,
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
) -> Self {
Self {
tx_cfds,
tx_order,
tx_quote,
tx_settlements,
}
}
}
pub struct Update<T>(pub T);
#[xtra_productivity]
impl Actor {
fn handle(&mut self, msg: Update<Vec<Cfd>>) {
let _ = self.tx_cfds.send(msg.0);
}
fn handle(&mut self, msg: Update<Option<Order>>) {
let _ = self.tx_order.send(msg.0);
}
fn handle(&mut self, msg: Update<Quote>) {
let _ = self.tx_quote.send(msg.0);
}
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx_settlements.send(msg.0);
}
}
impl xtra::Actor for Actor {}

36
daemon/src/taker.rs

@ -3,16 +3,20 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use bdk::bitcoin::{Address, Amount};
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::bitmex_price_feed::Quote;
use daemon::connection::ConnectionStatus;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, taker_cfd, wallet,
wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, projection,
taker_cfd, wallet, wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
@ -20,6 +24,7 @@ use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
use tracing_subscriber::filter::LevelFilter;
use watch::channel;
use xtra::prelude::MessageChannel;
use xtra::Actor;
@ -209,9 +214,6 @@ async fn main() -> Result<()> {
let mut tasks = Tasks::default();
let (task, quote_updates) = bitmex_price_feed::new().await?;
tasks.add(task);
let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port()));
@ -233,12 +235,11 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let (projection_actor, projection_context) = xtra::Context::new(None);
let TakerActorSystem {
cfd_actor_addr,
connection_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
mut maker_online_status_feed_receiver,
tasks: _tasks,
} = TakerActorSystem::new(
@ -255,8 +256,25 @@ async fn main() -> Result<()> {
},
N_PAYOUTS,
HEARTBEAT_INTERVAL * 2,
projection_actor.clone(),
)
.await?;
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
connect(connection_actor_addr, opts.maker_id, opts.maker).await?;
@ -271,7 +289,7 @@ async fn main() -> Result<()> {
.manage(cfd_action_channel)
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(quote_updates)
.manage(quote_receiver)
.manage(bitcoin_network)
.mount(
"/api",

74
daemon/src/taker_cfd.rs

@ -8,7 +8,7 @@ use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams};
use crate::tokio_ext::FutureExt;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, setup_contract, wallet, wire};
use crate::{log_error, oracle, projection, setup_contract, wallet, wire};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -16,7 +16,6 @@ use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use std::collections::HashMap;
use tokio::sync::watch;
use xtra::prelude::*;
pub struct TakeOffer {
@ -67,9 +66,7 @@ pub struct Actor<O, M, W> {
db: sqlx::SqlitePool,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<M>,
setup_state: SetupState,
@ -90,9 +87,7 @@ where
db: sqlx::SqlitePool,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
@ -102,9 +97,7 @@ where
db,
wallet,
oracle_pk,
cfd_feed_actor_inbox,
order_feed_actor_inbox,
update_cfd_feed_sender,
projection_actor,
send_to_maker,
monitor_actor,
setup_state: SetupState::None,
@ -117,18 +110,19 @@ where
}
impl<O, M, W> Actor<O, M, W> {
fn send_pending_update_proposals(&self) -> Result<()> {
async fn send_pending_update_proposals(&self) -> Result<()> {
Ok(self
.update_cfd_feed_sender
.send(self.current_pending_proposals.clone())?)
.projection_actor
.send(projection::Update(self.current_pending_proposals.clone()))
.await?)
}
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
}
self.send_pending_update_proposals()?;
self.send_pending_update_proposals().await?;
Ok(())
}
@ -158,12 +152,12 @@ impl<O, M, W> Actor<O, M, W> {
CfdState::outgoing_order_request(),
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// Cleanup own order feed, after inserting the cfd.
// Due to the 1:1 relationship between order and cfd we can never create another cfd for the
// same order id.
self.order_feed_actor_inbox.send(None)?;
self.projection_actor.send(projection::Update(None)).await?;
self.send_to_maker
.send(wire::TakerToMaker::TakeOrder { order_id, quantity })
@ -181,12 +175,7 @@ where
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
@ -218,7 +207,7 @@ where
direction: SettlementKind::Outgoing,
},
);
self.send_pending_update_proposals()?;
self.send_pending_update_proposals().await?;
self.send_to_maker
.send(wire::TakerToMaker::ProposeSettlement {
@ -241,7 +230,7 @@ where
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id)?;
self.remove_pending_proposal(&order_id).await?;
Ok(())
}
@ -250,6 +239,7 @@ where
tracing::info!(%order_id, "Roll over proposal got rejected");
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
Ok(())
@ -295,7 +285,7 @@ where
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
@ -319,10 +309,12 @@ impl<O, M, W> Actor<O, M, W> {
insert_order(&order, &mut conn).await?;
}
self.order_feed_actor_inbox.send(Some(order))?;
self.projection_actor
.send(projection::Update(Some(order)))
.await?;
}
None => {
self.order_feed_actor_inbox.send(None)?;
self.projection_actor.send(projection::Update(None)).await?;
}
}
Ok(())
@ -339,7 +331,7 @@ where
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
&self.projection_actor,
)
.await?;
Ok(())
@ -347,12 +339,7 @@ where
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
@ -379,7 +366,7 @@ where
direction: SettlementKind::Outgoing,
},
);
self.send_pending_update_proposals()?;
self.send_pending_update_proposals().await?;
self.send_to_maker
.send(wire::TakerToMaker::ProposeRollOver {
@ -421,7 +408,7 @@ where
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
@ -435,7 +422,7 @@ where
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
@ -490,7 +477,7 @@ where
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let offer_announcement = self
.oracle_actor
@ -603,6 +590,7 @@ where
};
self.remove_pending_proposal(&order_id)
.await
.context("Could not remove accepted roll over")?;
Ok(())
}
@ -629,7 +617,7 @@ where
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
@ -680,9 +668,9 @@ where
dlc.script_pubkey_for(cfd.role()),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.remove_pending_proposal(&order_id)?;
self.remove_pending_proposal(&order_id).await?;
self.monitor_actor
.send(monitor::CollaborativeSettlement {

70
daemon/tests/harness/mod.rs

@ -2,19 +2,24 @@ use crate::harness::mocks::monitor::MonitorActor;
use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{Connect, ConnectionStatus};
use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin};
use daemon::model::{Price, Usd};
use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals};
use daemon::model::{Price, Timestamp, Usd};
use daemon::seed::Seed;
use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem, Tasks};
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
};
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::util::SubscriberInitExt;
@ -47,16 +52,18 @@ pub struct Maker {
pub mocks: mocks::Mocks,
pub listen_addr: SocketAddr,
pub identity_pk: x25519_dalek::PublicKey,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
_tasks: Tasks,
}
impl Maker {
pub fn cfd_feed(&mut self) -> &mut watch::Receiver<Vec<Cfd>> {
&mut self.system.cfd_feed_receiver
&mut self.cfd_feed_receiver
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
&mut self.system.order_feed_receiver
&mut self.order_feed_receiver
}
pub async fn start(oracle_pk: schnorrsig::PublicKey) -> Self {
@ -75,6 +82,8 @@ impl Maker {
let seed = Seed::default();
let (identity_pk, identity_sk) = seed.derive_identity();
let (projection_actor, projection_context) = xtra::Context::new(None);
// system startup sends sync messages, mock them
mocks.mock_sync_handlers().await;
let maker = daemon::MakerActorSystem::new(
@ -93,10 +102,30 @@ impl Maker {
},
settlement_time_interval_hours,
N_PAYOUTS_FOR_TEST,
projection_actor.clone(),
)
.await
.unwrap();
let dummy_quote = Quote {
timestamp: Timestamp::now().unwrap(),
bid: Price::new(dec!(10000)).unwrap(),
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
@ -120,6 +149,8 @@ impl Maker {
listen_addr: address,
mocks,
_tasks: tasks,
cfd_feed_receiver,
order_feed_receiver,
}
}
@ -157,16 +188,18 @@ impl Maker {
pub struct Taker {
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub mocks: mocks::Mocks,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
_tasks: Tasks,
}
impl Taker {
pub fn cfd_feed(&mut self) -> &mut watch::Receiver<Vec<Cfd>> {
&mut self.system.cfd_feed_receiver
&mut self.cfd_feed_receiver
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
&mut self.system.order_feed_receiver
&mut self.order_feed_receiver
}
pub fn maker_status_feed(&mut self) -> &mut watch::Receiver<ConnectionStatus> {
@ -192,6 +225,8 @@ impl Taker {
let (wallet_addr, wallet_fut) = wallet.create(None).run();
tasks.add(wallet_fut);
let (projection_actor, projection_context) = xtra::Context::new(None);
// system startup sends sync messages, mock them
mocks.mock_sync_handlers().await;
let taker = daemon::TakerActorSystem::new(
@ -203,10 +238,29 @@ impl Taker {
|_, _| async { Ok(monitor) },
N_PAYOUTS_FOR_TEST,
HEARTBEAT_INTERVAL_FOR_TEST * 2,
projection_actor,
)
.await
.unwrap();
let dummy_quote = Quote {
timestamp: Timestamp::now().unwrap(),
bid: Price::new(dec!(10000)).unwrap(),
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _) = channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
taker
.connection_actor_addr
.send(Connect {
@ -221,6 +275,8 @@ impl Taker {
system: taker,
mocks,
_tasks: tasks,
order_feed_receiver,
cfd_feed_receiver,
}
}

Loading…
Cancel
Save