Browse Source

Merge #645

645: Introduce projection actor r=bonomat a=bonomat

Resolves #572 

Co-authored-by: bonomat <philipp@hoenisch.at>
reconnect-to-maker
bors[bot] 3 years ago
committed by GitHub
parent
commit
ec23aadd49
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      daemon/src/bitmex_price_feed.rs
  2. 43
      daemon/src/cfd_actors.rs
  3. 55
      daemon/src/db.rs
  4. 34
      daemon/src/lib.rs
  5. 36
      daemon/src/maker.rs
  6. 83
      daemon/src/maker_cfd.rs
  7. 47
      daemon/src/projection.rs
  8. 36
      daemon/src/taker.rs
  9. 74
      daemon/src/taker_cfd.rs
  10. 70
      daemon/tests/harness/mod.rs

12
daemon/src/bitmex_price_feed.rs

@ -1,15 +1,18 @@
use crate::model::{Price, Timestamp};
use crate::projection;
use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use rust_decimal::Decimal;
use std::convert::TryFrom;
use std::future::Future;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite;
use xtra::prelude::MessageChannel;
/// Connects to the BitMex price feed, returning the polling task and a watch channel that will
/// always hold the last quote.
pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)> {
pub async fn new(
msg_channel: impl MessageChannel<projection::Update<Quote>>,
) -> Result<(impl Future<Output = ()>, Quote)> {
let (connection, _) = tokio_tungstenite::connect_async(
"wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD",
)
@ -24,11 +27,10 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
tracing::info!("Connected to BitMex realtime API");
let first_quote = quotes.select_next_some().await?;
let (sender, receiver) = watch::channel(first_quote);
let task = async move {
while let Ok(Some(quote)) = quotes.try_next().await {
if sender.send(quote).is_err() {
if msg_channel.send(projection::Update(quote)).await.is_err() {
break; // If the receiver dies, we can exit the loop.
}
}
@ -36,7 +38,7 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
tracing::warn!("Failed to read quote from websocket");
};
Ok((task, receiver))
Ok((task, first_quote))
}
#[derive(Clone, Debug)]

43
daemon/src/cfd_actors.rs

@ -1,35 +1,30 @@
use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Attestation, Cfd, CfdState, OrderId};
use crate::{db, monitor, oracle, try_continue, wallet};
use crate::{db, monitor, oracle, projection, try_continue, wallet};
use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use tokio::sync::watch;
pub async fn insert_cfd_and_send_to_feed(
cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> {
if load_cfd_by_order_id(cfd.order.id, conn).await.is_ok() {
bail!(
"Cannot insert cfd because there is already a cfd for order id {}",
cfd.order.id
)
}
db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
projection_address
.send(projection::Update(db::load_all_cfds(conn).await?))
.await?;
Ok(())
}
pub async fn append_cfd_state(
cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> {
db::append_cfd_state(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?;
projection_address
.send(projection::Update(db::load_all_cfds(conn).await?))
.await?;
Ok(())
}
@ -37,7 +32,7 @@ pub async fn try_cet_publication<W>(
cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -54,7 +49,7 @@ where
bail!("If we can get the CET we should be able to transition")
}
append_cfd_state(cfd, conn, update_sender).await?;
append_cfd_state(cfd, conn, projection_address).await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
@ -69,7 +64,7 @@ pub async fn handle_monitoring_event<W>(
event: monitor::Event,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -84,10 +79,10 @@ where
return Ok(());
}
append_cfd_state(&cfd, conn, update_sender).await?;
append_cfd_state(&cfd, conn, projection_address).await?;
if let CfdState::OpenCommitted { .. } = cfd.state {
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
try_cet_publication(&mut cfd, conn, wallet, projection_address).await?;
} else if let CfdState::PendingRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet
@ -106,7 +101,7 @@ pub async fn handle_commit<W>(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -126,7 +121,7 @@ where
bail!("If we can get the commit tx we should be able to transition")
}
append_cfd_state(&cfd, conn, update_sender).await?;
append_cfd_state(&cfd, conn, projection_address).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
@ -136,7 +131,7 @@ pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -169,8 +164,8 @@ where
continue;
}
try_continue!(append_cfd_state(cfd, conn, update_sender).await);
try_continue!(try_cet_publication(cfd, conn, wallet, update_sender)
try_continue!(append_cfd_state(cfd, conn, projection_address).await);
try_continue!(try_cet_publication(cfd, conn, wallet, projection_address)
.await
.context("Error when trying to publish CET"));
}

55
daemon/src/db.rs

@ -1,6 +1,6 @@
use crate::model::cfd::{Cfd, CfdState, Order, OrderId};
use crate::model::{BitMexPriceEventId, Usd};
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use rust_decimal::Decimal;
use sqlx::pool::PoolConnection;
use sqlx::{Sqlite, SqlitePool};
@ -97,6 +97,13 @@ pub async fn load_order_by_id(
}
pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
if load_cfd_by_order_id(cfd.order.id, conn).await.is_ok() {
bail!(
"Cannot insert cfd because there is already a cfd for order id {}",
cfd.order.id
)
}
let state = serde_json::to_string(&cfd.state)?;
let query_result = sqlx::query(
r#"
@ -522,16 +529,14 @@ pub async fn load_cfds_by_oracle_event_id(
#[cfg(test)]
mod tests {
use crate::cfd_actors;
use pretty_assertions::assert_eq;
use rand::Rng;
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use time::macros::datetime;
use time::OffsetDateTime;
use tokio::sync::watch;
use crate::db::{self, insert_order};
use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, Order, Origin};
use crate::model::{Price, Usd};
@ -557,32 +562,6 @@ mod tests {
assert_eq!(vec![cfd], loaded);
}
#[tokio::test]
async fn test_insert_like_cfd_actor() {
let mut conn = setup_test_db().await;
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
assert_eq!(cfd_feed_receiver.borrow().clone(), vec![]);
let cfd_1 = Cfd::dummy();
db::insert_order(&cfd_1.order, &mut conn).await.unwrap();
cfd_actors::insert_cfd_and_send_to_feed(&cfd_1, &mut conn, &cfd_feed_sender)
.await
.unwrap();
assert_eq!(cfd_feed_receiver.borrow().clone(), vec![cfd_1.clone()]);
let cfd_2 = Cfd::dummy();
db::insert_order(&cfd_2.order, &mut conn).await.unwrap();
cfd_actors::insert_cfd_and_send_to_feed(&cfd_2, &mut conn, &cfd_feed_sender)
.await
.unwrap();
assert_eq!(cfd_feed_receiver.borrow().clone(), vec![cfd_1, cfd_2]);
}
#[tokio::test]
async fn test_insert_and_load_cfd_by_order_id() {
let mut conn = setup_test_db().await;
@ -716,6 +695,22 @@ mod tests {
assert_eq!(data.len(), 100);
}
#[tokio::test]
async fn inserting_two_cfds_with_same_order_id_should_fail() {
let mut conn = setup_test_db().await;
let cfd = Cfd::dummy().insert(&mut conn).await;
let error = insert_cfd(&cfd, &mut conn).await.err().unwrap();
assert_eq!(
error.to_string(),
format!(
"Cannot insert cfd because there is already a cfd for order id {}",
cfd.order.id
)
);
}
fn random_simple_state() -> CfdState {
match rand::thread_rng().gen_range(0, 5) {
0 => CfdState::outgoing_order_request(),

34
daemon/src/lib.rs

@ -10,7 +10,6 @@ use connection::ConnectionStatus;
use futures::future::RemoteHandle;
use maia::secp256k1_zkp::schnorrsig;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;
use tokio::sync::watch;
@ -36,6 +35,7 @@ mod noise;
pub mod olivia;
pub mod oracle;
pub mod payout_curve;
pub mod projection;
pub mod routes;
pub mod seed;
pub mod send_to_socket;
@ -75,9 +75,6 @@ impl Default for Tasks {
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub inc_conn_addr: Address<T>,
pub tasks: Tasks,
}
@ -111,6 +108,7 @@ where
) -> T,
settlement_time_interval_hours: time::Duration,
n_payouts: usize,
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
@ -119,11 +117,6 @@ where
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
@ -135,9 +128,7 @@ where
wallet_addr,
settlement_time_interval_hours,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
projection_actor,
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
@ -182,9 +173,6 @@ where
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr,
tasks,
})
@ -194,9 +182,6 @@ where
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub connection_actor_addr: Address<connection::Actor>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>,
pub tasks: Tasks,
}
@ -225,6 +210,7 @@ where
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
n_payouts: usize,
maker_heartbeat_interval: Duration,
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
@ -233,11 +219,6 @@ where
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (maker_online_status_feed_sender, maker_online_status_feed_receiver) =
watch::channel(ConnectionStatus::Offline);
@ -251,9 +232,7 @@ where
db,
wallet_addr,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
projection_actor,
Box::new(connection_actor_addr.clone()),
monitor_addr.clone(),
oracle_addr,
@ -301,9 +280,6 @@ where
Ok(Self {
cfd_actor_addr,
connection_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
maker_online_status_feed_receiver,
tasks,
})

36
daemon/src/maker.rs

@ -4,20 +4,25 @@ use bdk::bitcoin::Amount;
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::auth::{self, MAKER_USERNAME};
use daemon::bitmex_price_feed::Quote;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::Actor;
@ -225,9 +230,6 @@ async fn main() -> Result<()> {
let mut tasks = Tasks::default();
let (task, quote_updates) = bitmex_price_feed::new().await?;
tasks.add(task);
let db = SqlitePool::connect_with(
SqliteConnectOptions::new()
.create_if_missing(true)
@ -247,11 +249,11 @@ async fn main() -> Result<()> {
let settlement_time_interval_hours =
time::Duration::hours(opts.settlement_time_interval_hours as i64);
let (projection_actor, projection_context) = xtra::Context::new(None);
let MakerActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr: incoming_connection_addr,
tasks: _tasks,
} = MakerActorSystem::new(
@ -270,9 +272,27 @@ async fn main() -> Result<()> {
},
time::Duration::hours(opts.settlement_time_interval_hours as i64),
N_PAYOUTS,
projection_actor.clone(),
)
.await?;
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
@ -299,7 +319,7 @@ async fn main() -> Result<()> {
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(auth_password)
.manage(quote_updates)
.manage(quote_receiver)
.manage(bitcoin_network)
.mount(
"/api",

83
daemon/src/maker_cfd.rs

@ -2,12 +2,15 @@ use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
};
use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams;
use crate::tokio_ext::FutureExt;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire};
use crate::{
log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire,
UpdateCfdProposals,
};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -19,7 +22,6 @@ use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::collections::HashMap;
use time::Duration;
use tokio::sync::watch;
use xtra::prelude::*;
pub enum CfdAction {
@ -62,9 +64,7 @@ pub struct Actor<O, M, T, W> {
wallet: Address<W>,
settlement_time_interval_hours: Duration,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
takers: Address<T>,
current_order_id: Option<OrderId>,
monitor_actor: Address<M>,
@ -102,9 +102,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
wallet: Address<W>,
settlement_time_interval_hours: Duration,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
takers: Address<T>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
@ -115,9 +113,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
wallet,
settlement_time_interval_hours,
oracle_pk,
cfd_feed_actor_inbox,
order_feed_sender,
update_cfd_feed_sender,
projection_actor,
takers,
current_order_id: None,
monitor_actor,
@ -165,7 +161,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
taker_id,
),
);
self.send_pending_proposals()?;
self.send_pending_proposals().await?;
Ok(())
}
@ -189,7 +185,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
taker_id,
),
);
self.send_pending_proposals()?;
self.send_pending_proposals().await?;
Ok(())
}
@ -235,21 +231,25 @@ impl<O, M, T, W> Actor<O, M, T, W> {
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals(&self) -> Result<()> {
Ok(self.update_cfd_feed_sender.send(
self.current_pending_proposals
async fn send_pending_proposals(&self) -> Result<()> {
let pending_proposal = self
.current_pending_proposals
.iter()
.map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone())))
.collect(),
)?)
.collect::<UpdateCfdProposals>();
let _ = self
.projection_actor
.send(projection::Update(pending_proposal))
.await?;
Ok(())
}
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
}
self.send_pending_proposals()?;
self.send_pending_proposals().await?;
Ok(())
}
@ -283,12 +283,7 @@ where
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
@ -296,12 +291,7 @@ where
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
@ -312,7 +302,7 @@ where
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
&self.projection_actor,
)
.await?;
Ok(())
@ -361,11 +351,13 @@ where
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
Err(e) => {
tracing::warn!("Failed to notify taker of accepted settlement: {}", e);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
}
@ -381,6 +373,7 @@ where
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
self.takers
@ -413,6 +406,7 @@ where
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected roll_over")?;
self.takers
@ -472,7 +466,7 @@ where
self.takers
.send(maker_inc_connections::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
self.projection_actor.send(projection::Update(None)).await?;
// 3. Insert CFD in DB
let cfd = Cfd::new(
@ -485,7 +479,7 @@ where
taker_id,
},
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. check if order has acceptable amounts and if not reject the cfd
// Since rejection is tied to the cfd state at the moment we can only do this after creating
@ -533,7 +527,7 @@ where
) -> Result<()> {
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.takers
.send(maker_inc_connections::TakerMessage {
@ -598,7 +592,7 @@ where
// 4. Insert that we are in contract setup and refresh our own feed
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
// 5. Spawn away the contract setup
let (sender, receiver) = mpsc::unbounded();
@ -665,7 +659,7 @@ where
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
@ -677,7 +671,7 @@ where
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
@ -802,6 +796,7 @@ where
};
self.remove_pending_proposal(&order_id)
.await
.context("accepted roll_over")?;
Ok(())
}
@ -827,7 +822,7 @@ where
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
@ -887,7 +882,7 @@ where
own_script_pubkey.clone(),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
@ -976,7 +971,9 @@ where
self.current_order_id.replace(order.id);
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
self.projection_actor
.send(projection::Update(Some(order.clone())))
.await?;
// 4. Inform connected takers
self.takers

47
daemon/src/projection.rs

@ -0,0 +1,47 @@
use crate::bitmex_price_feed::Quote;
use crate::{Cfd, Order, UpdateCfdProposals};
use tokio::sync::watch;
use xtra_productivity::xtra_productivity;
pub struct Actor {
tx_cfds: watch::Sender<Vec<Cfd>>,
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
}
impl Actor {
pub fn new(
tx_cfds: watch::Sender<Vec<Cfd>>,
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
) -> Self {
Self {
tx_cfds,
tx_order,
tx_quote,
tx_settlements,
}
}
}
pub struct Update<T>(pub T);
#[xtra_productivity]
impl Actor {
fn handle(&mut self, msg: Update<Vec<Cfd>>) {
let _ = self.tx_cfds.send(msg.0);
}
fn handle(&mut self, msg: Update<Option<Order>>) {
let _ = self.tx_order.send(msg.0);
}
fn handle(&mut self, msg: Update<Quote>) {
let _ = self.tx_quote.send(msg.0);
}
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx_settlements.send(msg.0);
}
}
impl xtra::Actor for Actor {}

36
daemon/src/taker.rs

@ -3,16 +3,20 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use bdk::bitcoin::{Address, Amount};
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::bitmex_price_feed::Quote;
use daemon::connection::ConnectionStatus;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, taker_cfd, wallet,
wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, projection,
taker_cfd, wallet, wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
@ -20,6 +24,7 @@ use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
use tracing_subscriber::filter::LevelFilter;
use watch::channel;
use xtra::prelude::MessageChannel;
use xtra::Actor;
@ -209,9 +214,6 @@ async fn main() -> Result<()> {
let mut tasks = Tasks::default();
let (task, quote_updates) = bitmex_price_feed::new().await?;
tasks.add(task);
let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port()));
@ -233,12 +235,11 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let (projection_actor, projection_context) = xtra::Context::new(None);
let TakerActorSystem {
cfd_actor_addr,
connection_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
mut maker_online_status_feed_receiver,
tasks: _tasks,
} = TakerActorSystem::new(
@ -255,8 +256,25 @@ async fn main() -> Result<()> {
},
N_PAYOUTS,
HEARTBEAT_INTERVAL * 2,
projection_actor.clone(),
)
.await?;
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
connect(connection_actor_addr, opts.maker_id, opts.maker).await?;
@ -271,7 +289,7 @@ async fn main() -> Result<()> {
.manage(cfd_action_channel)
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(quote_updates)
.manage(quote_receiver)
.manage(bitcoin_network)
.mount(
"/api",

74
daemon/src/taker_cfd.rs

@ -8,7 +8,7 @@ use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams};
use crate::tokio_ext::FutureExt;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, setup_contract, wallet, wire};
use crate::{log_error, oracle, projection, setup_contract, wallet, wire};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -16,7 +16,6 @@ use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use std::collections::HashMap;
use tokio::sync::watch;
use xtra::prelude::*;
pub struct TakeOffer {
@ -67,9 +66,7 @@ pub struct Actor<O, M, W> {
db: sqlx::SqlitePool,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<M>,
setup_state: SetupState,
@ -90,9 +87,7 @@ where
db: sqlx::SqlitePool,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
projection_actor: Address<projection::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
@ -102,9 +97,7 @@ where
db,
wallet,
oracle_pk,
cfd_feed_actor_inbox,
order_feed_actor_inbox,
update_cfd_feed_sender,
projection_actor,
send_to_maker,
monitor_actor,
setup_state: SetupState::None,
@ -117,18 +110,19 @@ where
}
impl<O, M, W> Actor<O, M, W> {
fn send_pending_update_proposals(&self) -> Result<()> {
async fn send_pending_update_proposals(&self) -> Result<()> {
Ok(self
.update_cfd_feed_sender
.send(self.current_pending_proposals.clone())?)
.projection_actor
.send(projection::Update(self.current_pending_proposals.clone()))
.await?)
}
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
}
self.send_pending_update_proposals()?;
self.send_pending_update_proposals().await?;
Ok(())
}
@ -158,12 +152,12 @@ impl<O, M, W> Actor<O, M, W> {
CfdState::outgoing_order_request(),
);
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?;
// Cleanup own order feed, after inserting the cfd.
// Due to the 1:1 relationship between order and cfd we can never create another cfd for the
// same order id.
self.order_feed_actor_inbox.send(None)?;
self.projection_actor.send(projection::Update(None)).await?;
self.send_to_maker
.send(wire::TakerToMaker::TakeOrder { order_id, quantity })
@ -181,12 +175,7 @@ where
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
@ -218,7 +207,7 @@ where
direction: SettlementKind::Outgoing,
},
);
self.send_pending_update_proposals()?;
self.send_pending_update_proposals().await?;
self.send_to_maker
.send(wire::TakerToMaker::ProposeSettlement {
@ -241,7 +230,7 @@ where
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id)?;
self.remove_pending_proposal(&order_id).await?;
Ok(())
}
@ -250,6 +239,7 @@ where
tracing::info!(%order_id, "Roll over proposal got rejected");
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
Ok(())
@ -295,7 +285,7 @@ where
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
@ -319,10 +309,12 @@ impl<O, M, W> Actor<O, M, W> {
insert_order(&order, &mut conn).await?;
}
self.order_feed_actor_inbox.send(Some(order))?;
self.projection_actor
.send(projection::Update(Some(order)))
.await?;
}
None => {
self.order_feed_actor_inbox.send(None)?;
self.projection_actor.send(projection::Update(None)).await?;
}
}
Ok(())
@ -339,7 +331,7 @@ where
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
&self.projection_actor,
)
.await?;
Ok(())
@ -347,12 +339,7 @@ where
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
@ -379,7 +366,7 @@ where
direction: SettlementKind::Outgoing,
},
);
self.send_pending_update_proposals()?;
self.send_pending_update_proposals().await?;
self.send_to_maker
.send(wire::TakerToMaker::ProposeRollOver {
@ -421,7 +408,7 @@ where
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
@ -435,7 +422,7 @@ where
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
@ -490,7 +477,7 @@ where
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let offer_announcement = self
.oracle_actor
@ -603,6 +590,7 @@ where
};
self.remove_pending_proposal(&order_id)
.await
.context("Could not remove accepted roll over")?;
Ok(())
}
@ -629,7 +617,7 @@ where
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
@ -680,9 +668,9 @@ where
dlc.script_pubkey_for(cfd.role()),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.remove_pending_proposal(&order_id)?;
self.remove_pending_proposal(&order_id).await?;
self.monitor_actor
.send(monitor::CollaborativeSettlement {

70
daemon/tests/harness/mod.rs

@ -2,19 +2,24 @@ use crate::harness::mocks::monitor::MonitorActor;
use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{Connect, ConnectionStatus};
use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin};
use daemon::model::{Price, Usd};
use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals};
use daemon::model::{Price, Timestamp, Usd};
use daemon::seed::Seed;
use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem, Tasks};
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
};
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::util::SubscriberInitExt;
@ -47,16 +52,18 @@ pub struct Maker {
pub mocks: mocks::Mocks,
pub listen_addr: SocketAddr,
pub identity_pk: x25519_dalek::PublicKey,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
_tasks: Tasks,
}
impl Maker {
pub fn cfd_feed(&mut self) -> &mut watch::Receiver<Vec<Cfd>> {
&mut self.system.cfd_feed_receiver
&mut self.cfd_feed_receiver
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
&mut self.system.order_feed_receiver
&mut self.order_feed_receiver
}
pub async fn start(oracle_pk: schnorrsig::PublicKey) -> Self {
@ -75,6 +82,8 @@ impl Maker {
let seed = Seed::default();
let (identity_pk, identity_sk) = seed.derive_identity();
let (projection_actor, projection_context) = xtra::Context::new(None);
// system startup sends sync messages, mock them
mocks.mock_sync_handlers().await;
let maker = daemon::MakerActorSystem::new(
@ -93,10 +102,30 @@ impl Maker {
},
settlement_time_interval_hours,
N_PAYOUTS_FOR_TEST,
projection_actor.clone(),
)
.await
.unwrap();
let dummy_quote = Quote {
timestamp: Timestamp::now().unwrap(),
bid: Price::new(dec!(10000)).unwrap(),
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
@ -120,6 +149,8 @@ impl Maker {
listen_addr: address,
mocks,
_tasks: tasks,
cfd_feed_receiver,
order_feed_receiver,
}
}
@ -157,16 +188,18 @@ impl Maker {
pub struct Taker {
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub mocks: mocks::Mocks,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
_tasks: Tasks,
}
impl Taker {
pub fn cfd_feed(&mut self) -> &mut watch::Receiver<Vec<Cfd>> {
&mut self.system.cfd_feed_receiver
&mut self.cfd_feed_receiver
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
&mut self.system.order_feed_receiver
&mut self.order_feed_receiver
}
pub fn maker_status_feed(&mut self) -> &mut watch::Receiver<ConnectionStatus> {
@ -192,6 +225,8 @@ impl Taker {
let (wallet_addr, wallet_fut) = wallet.create(None).run();
tasks.add(wallet_fut);
let (projection_actor, projection_context) = xtra::Context::new(None);
// system startup sends sync messages, mock them
mocks.mock_sync_handlers().await;
let taker = daemon::TakerActorSystem::new(
@ -203,10 +238,29 @@ impl Taker {
|_, _| async { Ok(monitor) },
N_PAYOUTS_FOR_TEST,
HEARTBEAT_INTERVAL_FOR_TEST * 2,
projection_actor,
)
.await
.unwrap();
let dummy_quote = Quote {
timestamp: Timestamp::now().unwrap(),
bid: Price::new(dec!(10000)).unwrap(),
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _) = channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
)));
taker
.connection_actor_addr
.send(Connect {
@ -221,6 +275,8 @@ impl Taker {
system: taker,
mocks,
_tasks: tasks,
order_feed_receiver,
cfd_feed_receiver,
}
}

Loading…
Cancel
Save