Browse Source

Merge #313

313: Replace `Address` with `MessageChannel` in some key places r=luckysori a=luckysori

We may need to use `MessageChannel` instead of `Address` elsewhere (perhaps everywhere), but these definitely need to change.

---

Work towards #231.

Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
refactor/no-log-handler
bors[bot] 3 years ago
committed by GitHub
parent
commit
5c9a8e3795
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      daemon/src/maker.rs
  2. 8
      daemon/src/maker_cfd.rs
  3. 59
      daemon/src/monitor.rs
  4. 59
      daemon/src/oracle.rs
  5. 6
      daemon/src/taker.rs
  6. 8
      daemon/src/taker_cfd.rs

6
daemon/src/maker.rs

@ -260,9 +260,11 @@ async fn main() -> Result<()> {
.unwrap(), .unwrap(),
); );
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_maker_actor_inbox.clone(),
monitor_actor_address,
cfds, cfds,
[
Box::new(cfd_maker_actor_inbox.clone()),
Box::new(monitor_actor_address),
],
))); )));
oracle_actor_address oracle_actor_address

8
daemon/src/maker_cfd.rs

@ -86,10 +86,10 @@ pub struct Actor {
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>, update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>, current_order_id: Option<OrderId>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor>,
setup_state: SetupState, setup_state: SetupState,
roll_over_state: RollOverState, roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor>,
// Maker needs to also store TakerId to be able to send a reply back // Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>, current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>,
// TODO: Persist instead of using an in-memory hashmap for resiliency? // TODO: Persist instead of using an in-memory hashmap for resiliency?
@ -124,8 +124,8 @@ impl Actor {
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>, update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor>,
) -> Self { ) -> Self {
Self { Self {
db, db,

59
daemon/src/monitor.rs

@ -14,6 +14,7 @@ use std::convert::{TryFrom, TryInto};
use std::fmt; use std::fmt;
use std::marker::Send; use std::marker::Send;
use std::ops::{Add, RangeInclusive}; use std::ops::{Add, RangeInclusive};
use xtra::prelude::StrongMessageChannel;
const FINALITY_CONFIRMATIONS: u32 = 1; const FINALITY_CONFIRMATIONS: u32 = 1;
@ -33,26 +34,19 @@ pub struct MonitorParams {
pub struct Sync; pub struct Sync;
pub struct Actor<T, C = bdk::electrum_client::Client> pub struct Actor<C = bdk::electrum_client::Client> {
where
T: xtra::Actor,
{
cfds: HashMap<OrderId, MonitorParams>, cfds: HashMap<OrderId, MonitorParams>,
cfd_actor_addr: xtra::Address<T>, event_channel: Box<dyn StrongMessageChannel<Event>>,
client: C, client: C,
latest_block_height: BlockHeight, latest_block_height: BlockHeight,
current_status: BTreeMap<(Txid, Script), ScriptStatus>, current_status: BTreeMap<(Txid, Script), ScriptStatus>,
awaiting_status: HashMap<(Txid, Script), Vec<(ScriptStatus, Event)>>, awaiting_status: HashMap<(Txid, Script), Vec<(ScriptStatus, Event)>>,
} }
impl<T> Actor<T, bdk::electrum_client::Client> impl Actor<bdk::electrum_client::Client> {
where
T: xtra::Actor + xtra::Handler<Event>,
{
pub async fn new( pub async fn new(
electrum_rpc_url: &str, electrum_rpc_url: &str,
cfd_actor_addr: xtra::Address<T>, event_channel: impl StrongMessageChannel<Event> + 'static,
cfds: Vec<Cfd>, cfds: Vec<Cfd>,
) -> Result<Self> { ) -> Result<Self> {
let client = bdk::electrum_client::Client::new(electrum_rpc_url) let client = bdk::electrum_client::Client::new(electrum_rpc_url)
@ -66,7 +60,7 @@ where
let mut actor = Self { let mut actor = Self {
cfds: HashMap::new(), cfds: HashMap::new(),
cfd_actor_addr, event_channel: Box::new(event_channel),
client, client,
latest_block_height: BlockHeight::try_from(latest_block)?, latest_block_height: BlockHeight::try_from(latest_block)?,
current_status: BTreeMap::default(), current_status: BTreeMap::default(),
@ -131,7 +125,7 @@ where
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?; actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.order.id);
} }
CfdState::PendingClose { collaborative_close, .. } => { CfdState::PendingClose { collaborative_close, .. } => {
let transaction = collaborative_close.tx; let transaction = collaborative_close.tx;
let close_params = (transaction.txid(), let close_params = (transaction.txid(),
@ -164,9 +158,8 @@ where
} }
} }
impl<T, C> Actor<T, C> impl<C> Actor<C>
where where
T: xtra::Actor + xtra::Handler<Event>,
C: bdk::electrum_client::ElectrumApi, C: bdk::electrum_client::ElectrumApi,
{ {
fn monitor_all(&mut self, params: &MonitorParams, order_id: OrderId) { fn monitor_all(&mut self, params: &MonitorParams, order_id: OrderId) {
@ -415,7 +408,7 @@ where
for (target_status, event) in reached_monitoring_target { for (target_status, event) in reached_monitoring_target {
tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target"); tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target");
self.cfd_actor_addr.send(event).await?; self.event_channel.send(event).await?;
} }
} }
} }
@ -622,18 +615,11 @@ impl xtra::Message for Sync {
type Result = (); type Result = ();
} }
impl<T, C> xtra::Actor for Actor<T, C> impl<C> xtra::Actor for Actor<C> where C: Send + 'static {}
where
T: xtra::Actor,
C: Send,
C: 'static,
{
}
#[async_trait] #[async_trait]
impl<T, C> xtra::Handler<StartMonitoring> for Actor<T, C> impl<C> xtra::Handler<StartMonitoring> for Actor<C>
where where
T: xtra::Actor + xtra::Handler<Event>,
C: bdk::electrum_client::ElectrumApi + Send + 'static, C: bdk::electrum_client::ElectrumApi + Send + 'static,
{ {
async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) {
@ -644,9 +630,8 @@ where
} }
} }
#[async_trait] #[async_trait]
impl<T, C> xtra::Handler<Sync> for Actor<T, C> impl<C> xtra::Handler<Sync> for Actor<C>
where where
T: xtra::Actor + xtra::Handler<Event>,
C: bdk::electrum_client::ElectrumApi + Send + 'static, C: bdk::electrum_client::ElectrumApi + Send + 'static,
{ {
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) {
@ -655,10 +640,7 @@ where
} }
#[async_trait] #[async_trait]
impl<T> xtra::Handler<oracle::Attestation> for Actor<T> impl xtra::Handler<oracle::Attestation> for Actor {
where
T: xtra::Actor + xtra::Handler<Event>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_oracle_attestation(msg)); log_error!(self.handle_oracle_attestation(msg));
} }
@ -690,7 +672,7 @@ mod tests {
let refund_expired = Event::RefundTimelockExpired(OrderId::default()); let refund_expired = Event::RefundTimelockExpired(OrderId::default());
let mut monitor = Actor::for_test( let mut monitor = Actor::for_test(
recorder_address, Box::new(recorder_address),
[( [(
(txid1(), script1()), (txid1(), script1()),
vec![ vec![
@ -736,7 +718,7 @@ mod tests {
let refund_finality = Event::RefundFinality(OrderId::default()); let refund_finality = Event::RefundFinality(OrderId::default());
let mut monitor = Actor::for_test( let mut monitor = Actor::for_test(
recorder_address, Box::new(recorder_address),
[ [
( (
(txid1(), script1()), (txid1(), script1()),
@ -773,7 +755,7 @@ mod tests {
let cet_finality = Event::CetFinality(OrderId::default()); let cet_finality = Event::CetFinality(OrderId::default());
let mut monitor = Actor::for_test( let mut monitor = Actor::for_test(
recorder_address, Box::new(recorder_address),
[( [(
(txid1(), script1()), (txid1(), script1()),
vec![(ScriptStatus::finality(), cet_finality.clone())], vec![(ScriptStatus::finality(), cet_finality.clone())],
@ -790,18 +772,15 @@ mod tests {
assert!(monitor.awaiting_status.is_empty()); assert!(monitor.awaiting_status.is_empty());
} }
impl<A> Actor<A, stub::Client> impl Actor<stub::Client> {
where
A: xtra::Actor + xtra::Handler<Event>,
{
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn for_test<const N: usize>( fn for_test<const N: usize>(
address: xtra::Address<A>, event_channel: Box<dyn StrongMessageChannel<Event>>,
subscriptions: [((Txid, Script), Vec<(ScriptStatus, Event)>); N], subscriptions: [((Txid, Script), Vec<(ScriptStatus, Event)>); N],
) -> Self { ) -> Self {
Actor { Actor {
cfds: HashMap::default(), cfds: HashMap::default(),
cfd_actor_addr: address, event_channel,
client: stub::Client::default(), client: stub::Client::default(),
latest_block_height: BlockHeight(0), latest_block_height: BlockHeight(0),
current_status: BTreeMap::default(), current_status: BTreeMap::default(),

59
daemon/src/oracle.rs

@ -9,13 +9,13 @@ use serde::Deserialize;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ops::Add; use std::ops::Add;
use time::ext::NumericalDuration; use time::ext::NumericalDuration;
use xtra::prelude::StrongMessageChannel;
pub struct Actor<CFD, M> { pub struct Actor {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>, announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<BitMexPriceEventId>, pending_announcements: HashSet<BitMexPriceEventId>,
pending_attestations: HashSet<BitMexPriceEventId>, pending_attestations: HashSet<BitMexPriceEventId>,
cfd_actor_address: xtra::Address<CFD>, attestation_channels: [Box<dyn StrongMessageChannel<Attestation>>; 2],
monitor_actor_address: xtra::Address<M>,
} }
pub struct Sync; pub struct Sync;
@ -64,11 +64,10 @@ struct NewAttestationFetched {
attestation: Attestation, attestation: Attestation,
} }
impl<CFD, M> Actor<CFD, M> { impl Actor {
pub fn new( pub fn new(
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
cfds: Vec<Cfd>, cfds: Vec<Cfd>,
attestation_channels: [Box<dyn StrongMessageChannel<Attestation>>; 2],
) -> Self { ) -> Self {
let mut pending_attestations = HashSet::new(); let mut pending_attestations = HashSet::new();
@ -103,17 +102,12 @@ impl<CFD, M> Actor<CFD, M> {
announcements: HashMap::new(), announcements: HashMap::new(),
pending_announcements: HashSet::new(), pending_announcements: HashSet::new(),
pending_attestations, pending_attestations,
cfd_actor_address, attestation_channels,
monitor_actor_address,
} }
} }
} }
impl<CFD, M> Actor<CFD, M> impl Actor {
where
CFD: 'static,
M: 'static,
{
fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) { fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_announcements.iter().cloned() { for event_id in self.pending_announcements.iter().cloned() {
let this = ctx.address().expect("self to be alive"); let this = ctx.address().expect("self to be alive");
@ -149,11 +143,7 @@ where
} }
} }
impl<CFD, M> Actor<CFD, M> impl Actor {
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
fn update_pending_attestations(&mut self, ctx: &mut xtra::Context<Self>) { fn update_pending_attestations(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_attestations.iter().copied() { for event_id in self.pending_attestations.iter().copied() {
if !event_id.has_likely_occured() { if !event_id.has_likely_occured() {
@ -203,14 +193,9 @@ where
) -> Result<()> { ) -> Result<()> {
tracing::info!("Fetched new attestation for {}", id); tracing::info!("Fetched new attestation for {}", id);
self.cfd_actor_address for channel in self.attestation_channels.iter() {
.clone() channel.do_send(attestation.clone())?;
.do_send_async(attestation.clone()) }
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
self.pending_attestations.remove(&id); self.pending_attestations.remove(&id);
@ -219,7 +204,7 @@ where
} }
#[async_trait] #[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD, M> { impl xtra::Handler<MonitorAttestation> for Actor {
async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) {
if !self.pending_attestations.insert(msg.event_id) { if !self.pending_attestations.insert(msg.event_id) {
tracing::trace!("Attestation {} already being monitored", msg.event_id); tracing::trace!("Attestation {} already being monitored", msg.event_id);
@ -228,7 +213,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD,
} }
#[async_trait] #[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M> { impl xtra::Handler<FetchAnnouncement> for Actor {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
if !self.pending_announcements.insert(msg.0) { if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0); tracing::trace!("Announcement {} already being fetched", msg.0);
@ -237,7 +222,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M
} }
#[async_trait] #[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M> { impl xtra::Handler<GetAnnouncement> for Actor {
async fn handle( async fn handle(
&mut self, &mut self,
msg: GetAnnouncement, msg: GetAnnouncement,
@ -254,7 +239,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
} }
#[async_trait] #[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<CFD, M> { impl xtra::Handler<NewAnnouncementFetched> for Actor {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
self.pending_announcements.remove(&msg.id); self.pending_announcements.remove(&msg.id);
self.announcements self.announcements
@ -263,11 +248,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<C
} }
#[async_trait] #[async_trait]
impl<CFD, M> xtra::Handler<NewAttestationFetched> for Actor<CFD, M> impl xtra::Handler<NewAttestationFetched> for Actor {
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation)); log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation));
} }
@ -310,14 +291,10 @@ impl From<Announcement> for cfd_protocol::Announcement {
} }
} }
impl<CFD: 'static, M: 'static> xtra::Actor for Actor<CFD, M> {} impl xtra::Actor for Actor {}
#[async_trait] #[async_trait]
impl<CFD, M> xtra::Handler<Sync> for Actor<CFD, M> impl xtra::Handler<Sync> for Actor {
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx); self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx); self.update_pending_attestations(ctx);

6
daemon/src/taker.rs

@ -254,9 +254,11 @@ async fn main() -> Result<()> {
.unwrap(), .unwrap(),
); );
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_actor_inbox.clone(),
monitor_actor_address,
cfds, cfds,
[
Box::new(cfd_actor_inbox.clone()),
Box::new(monitor_actor_address),
],
))); )));
oracle_actor_address oracle_actor_address

8
daemon/src/taker_cfd.rs

@ -77,10 +77,10 @@ pub struct Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>, update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor>,
setup_state: SetupState, setup_state: SetupState,
roll_over_state: RollOverState, roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor>,
current_pending_proposals: UpdateCfdProposals, current_pending_proposals: UpdateCfdProposals,
} }
@ -94,8 +94,8 @@ impl Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>, update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor>,
) -> Self { ) -> Self {
Self { Self {
db, db,

Loading…
Cancel
Save