Browse Source

Use MessageChannel instead of send_to_socket::Actor

We depend on `luckysori/xtra` until
https://github.com/Restioson/xtra/pull/45 is released.
debug-statements
Lucas Soriano del Pino 3 years ago
parent
commit
20c76f6134
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 2
      Cargo.lock
  2. 2
      Cargo.toml
  3. 2
      daemon/src/taker.rs
  4. 31
      daemon/src/taker_cfd.rs

2
Cargo.lock

@ -3353,7 +3353,7 @@ dependencies = [
[[package]] [[package]]
name = "xtra" name = "xtra"
version = "0.6.0" version = "0.6.0"
source = "git+https://github.com/Restioson/xtra#7ae9140dbc30127aaa5f701078b72ac143babeb4" source = "git+https://github.com/luckysori/xtra?rev=19684c196f71ec762e595906cc88c6142b82092e#19684c196f71ec762e595906cc88c6142b82092e"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"barrage", "barrage",

2
Cargo.toml

@ -4,4 +4,4 @@ resolver = "2"
[patch.crates-io] [patch.crates-io]
rocket = { git = "https://github.com/SergioBenitez/Rocket" } # Need to patch rocket dependency of `rocket_basicauth` until there is an official release. rocket = { git = "https://github.com/SergioBenitez/Rocket" } # Need to patch rocket dependency of `rocket_basicauth` until there is an official release.
xtra = { git = "https://github.com/Restioson/xtra" } # Need to patch xtra dependency until there is an official release as it contains useful PRs from us. xtra = { git = "https://github.com/luckysori/xtra", rev = "19684c196f71ec762e595906cc88c6142b82092e" } # Need to patch xtra dependency until there is an official release as it contains useful PRs from us.

2
daemon/src/taker.rs

@ -221,7 +221,7 @@ async fn main() -> Result<()> {
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
update_cfd_feed_sender, update_cfd_feed_sender,
send_to_maker, Box::new(send_to_maker),
monitor_actor_address.clone(), monitor_actor_address.clone(),
oracle_actor_address.clone(), oracle_actor_address.clone(),
) )

31
daemon/src/taker_cfd.rs

@ -9,7 +9,7 @@ use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, send_to_socket, setup_contract, wire}; use crate::{log_error, oracle, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -74,7 +74,7 @@ pub struct Actor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>, update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor>, monitor_actor: Address<monitor::Actor>,
setup_state: SetupState, setup_state: SetupState,
roll_over_state: RollOverState, roll_over_state: RollOverState,
@ -91,7 +91,7 @@ impl Actor {
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>, update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker> + Send>,
monitor_actor: Address<monitor::Actor>, monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor>, oracle_actor: Address<oracle::Actor>,
) -> Self { ) -> Self {
@ -155,8 +155,7 @@ impl Actor {
insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .do_send(wire::TakerToMaker::TakeOrder { order_id, quantity })?;
.await?;
Ok(()) Ok(())
} }
@ -191,14 +190,13 @@ impl Actor {
self.send_pending_update_proposals()?; self.send_pending_update_proposals()?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::ProposeSettlement { .do_send(wire::TakerToMaker::ProposeSettlement {
order_id: proposal.order_id, order_id: proposal.order_id,
timestamp: proposal.timestamp, timestamp: proposal.timestamp,
taker: proposal.taker, taker: proposal.taker,
maker: proposal.maker, maker: proposal.maker,
price: proposal.price, price: proposal.price,
}) })?;
.await?;
Ok(()) Ok(())
} }
@ -222,11 +220,10 @@ impl Actor {
self.send_pending_update_proposals()?; self.send_pending_update_proposals()?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::ProposeRollOver { .do_send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id, order_id: proposal.order_id,
timestamp: proposal.timestamp, timestamp: proposal.timestamp,
}) })?;
.await?;
Ok(()) Ok(())
} }
@ -283,8 +280,8 @@ impl Actor {
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
self.send_to_maker self.send_to_maker
.clone() .sink()
.into_sink() .clone_message_sink()
.with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))), .with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))),
receiver, receiver,
(self.oracle_pk, offer_announcement), (self.oracle_pk, offer_announcement),
@ -337,11 +334,10 @@ impl Actor {
let (tx, sig_taker) = dlc.close_transaction(proposal)?; let (tx, sig_taker) = dlc.close_transaction(proposal)?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::InitiateSettlement { .do_send(wire::TakerToMaker::InitiateSettlement {
order_id, order_id,
sig_taker, sig_taker,
}) })?;
.await?;
cfd.handle(CfdStateChangeEvent::ProposalSigned( cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new( CollaborativeSettlement::new(
@ -391,8 +387,7 @@ impl Actor {
let contract_future = setup_contract::roll_over( let contract_future = setup_contract::roll_over(
self.send_to_maker self.send_to_maker
.clone() .sink()
.into_sink()
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))),
receiver, receiver,
(self.oracle_pk, announcement), (self.oracle_pk, announcement),

Loading…
Cancel
Save