Browse Source

Merge #353

353: Make `taker_cfd::Actor` generic over other actor addresses r=luckysori a=luckysori

Equivalent to what we did to `maker_cfd::Actor` in #339.

Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
debug-statements
bors[bot] 3 years ago
committed by GitHub
parent
commit
86c0e2c35f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      Cargo.lock
  2. 2
      Cargo.toml
  3. 2
      daemon/src/taker.rs
  4. 434
      daemon/src/taker_cfd.rs

2
Cargo.lock

@ -3353,7 +3353,7 @@ dependencies = [
[[package]]
name = "xtra"
version = "0.6.0"
source = "git+https://github.com/Restioson/xtra#7ae9140dbc30127aaa5f701078b72ac143babeb4"
source = "git+https://github.com/luckysori/xtra?rev=19684c196f71ec762e595906cc88c6142b82092e#19684c196f71ec762e595906cc88c6142b82092e"
dependencies = [
"async-trait",
"barrage",

2
Cargo.toml

@ -4,4 +4,4 @@ resolver = "2"
[patch.crates-io]
rocket = { git = "https://github.com/SergioBenitez/Rocket" } # Need to patch rocket dependency of `rocket_basicauth` until there is an official release.
xtra = { git = "https://github.com/Restioson/xtra" } # Need to patch xtra dependency until there is an official release as it contains useful PRs from us.
xtra = { git = "https://github.com/luckysori/xtra", rev = "19684c196f71ec762e595906cc88c6142b82092e" } # Need to patch xtra dependency until there is an official release as it contains useful PRs from us.

2
daemon/src/taker.rs

@ -221,7 +221,7 @@ async fn main() -> Result<()> {
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
send_to_maker,
Box::new(send_to_maker),
monitor_actor_address.clone(),
oracle_actor_address.clone(),
)

434
daemon/src/taker_cfd.rs

@ -9,7 +9,7 @@ use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, send_to_socket, setup_contract, wire};
use crate::{log_error, oracle, setup_contract, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -67,22 +67,22 @@ enum RollOverState {
None,
}
pub struct Actor {
pub struct Actor<O, M> {
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<M>,
setup_state: SetupState,
roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor>,
oracle_actor: Address<O>,
current_pending_proposals: UpdateCfdProposals,
}
impl Actor {
impl<O, M> Actor<O, M> {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
@ -91,9 +91,9 @@ impl Actor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor>,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker> + Send>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
) -> Self {
Self {
db,
@ -155,8 +155,7 @@ impl Actor {
insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?;
.do_send(wire::TakerToMaker::TakeOrder { order_id, quantity })?;
Ok(())
}
@ -191,14 +190,13 @@ impl Actor {
self.send_pending_update_proposals()?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::ProposeSettlement {
.do_send(wire::TakerToMaker::ProposeSettlement {
order_id: proposal.order_id,
timestamp: proposal.timestamp,
taker: proposal.taker,
maker: proposal.maker,
price: proposal.price,
})
.await?;
})?;
Ok(())
}
@ -222,14 +220,110 @@ impl Actor {
self.send_pending_update_proposals()?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::ProposeRollOver {
.do_send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id,
timestamp: proposal.timestamp,
})
.await?;
})?;
Ok(())
}
async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
Ok(())
}
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id)?;
Ok(())
}
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Roll over request rejected");
// TODO: tell UI that roll over was rejected
// this is not too bad as we are still monitoring for the CFD to expiry
// the taker can just try to ask again :)
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender } => {
sender.send(msg).await?;
}
SetupState::None => {
anyhow::bail!("Received setup message without an active contract setup")
}
}
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender } => {
sender.send(msg).await?;
}
RollOverState::None => {
anyhow::bail!("Received message without an active roll_over setup")
}
}
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
impl<O, M> Actor<O, M>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
{
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
match order {
Some(mut order) => {
@ -249,7 +343,62 @@ impl Actor {
}
Ok(())
}
}
impl<O, M> Actor<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
let dlc = dlc.context("Failed to setup contract with maker")?;
tracing::info!("Setup complete, publishing on chain now");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: cfd.order.oracle_event_id,
})
.await?;
Ok(())
}
}
impl<O: 'static, M: 'static> Actor<O, M>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_order_accepted(
&mut self,
order_id: OrderId,
@ -283,8 +432,8 @@ impl Actor {
let contract_future = setup_contract::new(
self.send_to_maker
.clone()
.into_sink()
.sink()
.clone_message_sink()
.with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))),
receiver,
(self.oracle_pk, offer_announcement),
@ -308,62 +457,13 @@ impl Actor {
Ok(())
}
}
async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
Ok(())
}
async fn handle_settlement_accepted(
&mut self,
order_id: OrderId,
_ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got accepted");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let proposal = self.get_settlement_proposal(order_id)?;
let (tx, sig_taker) = dlc.close_transaction(proposal)?;
self.send_to_maker
.do_send_async(wire::TakerToMaker::InitiateSettlement {
order_id,
sig_taker,
})
.await?;
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
),
))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.remove_pending_proposal(&order_id)?;
self.monitor_actor
.do_send_async(monitor::CollaborativeSettlement {
order_id,
tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)),
})
.await?;
Ok(())
}
impl<O: 'static, M: 'static> Actor<O, M>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle_roll_over_accepted(
&mut self,
order_id: OrderId,
@ -391,8 +491,7 @@ impl Actor {
let contract_future = setup_contract::roll_over(
self.send_to_maker
.clone()
.into_sink()
.sink()
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))),
receiver,
(self.oracle_pk, announcement),
@ -418,78 +517,31 @@ impl Actor {
.context("Could not remove accepted roll over")?;
Ok(())
}
}
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id)?;
Ok(())
}
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Roll over request rejected");
// TODO: tell UI that roll over was rejected
// this is not too bad as we are still monitoring for the CFD to expiry
// the taker can just try to ask again :)
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender } => {
sender.send(msg).await?;
}
SetupState::None => {
anyhow::bail!("Received setup message without an active contract setup")
}
}
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender } => {
sender.send(msg).await?;
}
RollOverState::None => {
anyhow::bail!("Received message without an active roll_over setup")
}
}
Ok(())
}
async fn handle_cfd_setup_completed(
impl<O: 'static, M: 'static> Actor<O, M>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle_cfd_roll_over_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
let dlc = dlc.context("Failed to setup contract with maker")?;
tracing::info!("Setup complete, publishing on chain now");
let dlc = dlc.context("Failed to roll over contract with maker")?;
self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::PendingOpen {
cfd.state = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
@ -497,90 +549,66 @@ impl Actor {
})
.await?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: cfd.order.oracle_event_id,
})
.await?;
Ok(())
}
}
async fn handle_cfd_roll_over_completed(
impl<O: 'static, M: 'static> Actor<O, M>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle_settlement_accepted(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
_ctx: &mut Context<Self>,
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with maker")?;
self.roll_over_state = RollOverState::None;
tracing::info!(%order_id, "Settlement proposal got accepted");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
cfd.state = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let proposal = self.get_settlement_proposal(order_id)?;
let (tx, sig_taker) = dlc.close_transaction(proposal)?;
self.send_to_maker
.do_send(wire::TakerToMaker::InitiateSettlement {
order_id,
sig_taker,
})?;
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
),
))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.remove_pending_proposal(&order_id)?;
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
.do_send_async(monitor::CollaborativeSettlement {
order_id,
tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)),
})
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
#[async_trait]
impl Handler<TakeOffer> for Actor {
impl<O: 'static, M: 'static> Handler<TakeOffer> for Actor<O, M> {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_offer(msg.order_id, msg.quantity));
}
}
#[async_trait]
impl Handler<CfdAction> for Actor {
impl<O: 'static, M: 'static> Handler<CfdAction> for Actor<O, M> {
async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) {
use CfdAction::*;
@ -601,7 +629,14 @@ impl Handler<CfdAction> for Actor {
}
#[async_trait]
impl Handler<MakerStreamMessage> for Actor {
impl<O: 'static, M: 'static> Handler<MakerStreamMessage> for Actor<O, M>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle(
&mut self,
message: MakerStreamMessage,
@ -654,28 +689,35 @@ impl Handler<MakerStreamMessage> for Actor {
}
#[async_trait]
impl Handler<CfdSetupCompleted> for Actor {
impl<O: 'static, M: 'static> Handler<CfdSetupCompleted> for Actor<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<CfdRollOverCompleted> for Actor {
impl<O: 'static, M: 'static> Handler<CfdRollOverCompleted> for Actor<O, M>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<monitor::Event> for Actor {
impl<O: 'static, M: 'static> Handler<monitor::Event> for Actor<O, M> {
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
impl<O: 'static, M: 'static> Handler<oracle::Attestation> for Actor<O, M> {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
@ -702,4 +744,4 @@ impl Message for CfdRollOverCompleted {
type Result = ();
}
impl xtra::Actor for Actor {}
impl<O: 'static, M: 'static> xtra::Actor for Actor<O, M> {}

Loading…
Cancel
Save