Browse Source

Merge #418 #420 #422

418: Stop contract and rollover from the maker if cannot notify the taker r=da-kami a=klochowicz

As for now, we were only logging the communication problem and moving on.

420: Be productive r=klochowicz a=thomaseizinger



422: Bump rust_decimal from 1.16.0 to 1.17.0 r=da-kami a=dependabot[bot]

Bumps [rust_decimal](https://github.com/paupino/rust-decimal) from 1.16.0 to 1.17.0.
<details>
<summary>Commits</summary>
<ul>
<li>See full diff in <a href="https://github.com/paupino/rust-decimal/commits">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=rust_decimal&package-manager=cargo&previous-version=1.16.0&new-version=1.17.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
contact-taker-before-changing-cfd-state
bors[bot] 3 years ago
committed by GitHub
parent
commit
35107182ba
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      Cargo.lock
  2. 2
      daemon/Cargo.toml
  3. 44
      daemon/src/maker_cfd.rs
  4. 1
      daemon/src/maker_inc_connections.rs
  5. 49
      daemon/src/monitor.rs
  6. 70
      daemon/src/oracle.rs
  7. 34
      xtra_productivity/src/lib.rs
  8. 27
      xtra_productivity/tests/pass/actor_with_generics.rs
  9. 1
      xtra_productivity/tests/pass/can_handle_message.rs

4
Cargo.lock

@ -2240,9 +2240,9 @@ dependencies = [
[[package]]
name = "rust_decimal"
version = "1.16.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f1028de22e436bb35fce070310ee57d57b5e59ae77b4e3f24ce4773312b813"
checksum = "353775f96a1f400edcca737f843cb201af3645912e741e64456a257c770173e8"
dependencies = [
"arrayvec",
"num-traits",

2
daemon/Cargo.toml

@ -24,7 +24,7 @@ reqwest = { version = "0.11", default-features = false, features = ["json", "rus
rocket = { version = "0.5.0-rc.1", features = ["json"] }
rocket-basicauth = { version = "2", default-features = false }
rust-embed = "6.2"
rust_decimal = "1.16"
rust_decimal = "1.17"
rust_decimal_macros = "1.16"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

44
daemon/src/maker_cfd.rs

@ -8,7 +8,6 @@ use crate::model::cfd::{
};
use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::tokio_ext::spawn_fallible;
use crate::wallet::Wallet;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{Context as _, Result};
@ -579,18 +578,12 @@ where
// Use `.send` here to ensure we only continue once the message has been sent
// Nothing done after this call should be able to fail, otherwise we notified the taker, but
// might not transition to `Active` ourselves!
spawn_fallible::<_, anyhow::Error>({
let takers = self.takers.clone();
async move {
takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderAccepted { id: order_id },
})
.await??;
Ok(())
}
});
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderAccepted { id: order_id },
})
.await??;
// 5. Spawn away the contract setup
let (sender, receiver) = mpsc::unbounded();
@ -763,22 +756,15 @@ where
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
spawn_fallible::<_, anyhow::Error>({
let takers = self.takers.clone();
let order_id = proposal.order_id;
async move {
takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverAccepted {
id: order_id,
oracle_event_id,
},
})
.await??;
Ok(())
}
});
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverAccepted {
id: proposal.order_id,
oracle_event_id,
},
})
.await??;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {

1
daemon/src/maker_inc_connections.rs

@ -3,7 +3,6 @@ use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::{forward_only_ok, maker_cfd, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::io;

49
daemon/src/monitor.rs

@ -15,6 +15,7 @@ use std::fmt;
use std::marker::Send;
use std::ops::{Add, RangeInclusive};
use xtra::prelude::StrongMessageChannel;
use xtra_productivity::xtra_productivity;
const FINALITY_CONFIRMATIONS: u32 = 1;
@ -320,16 +321,6 @@ where
Ok(())
}
fn handle_collaborative_settlement(
&mut self,
collaborative_settlement: CollaborativeSettlement,
) {
self.monitor_close_finality(
collaborative_settlement.tx,
collaborative_settlement.order_id,
);
}
async fn update_state(
&mut self,
latest_block_height: BlockHeight,
@ -530,14 +521,6 @@ impl Add<u32> for BlockHeight {
}
}
impl xtra::Message for StartMonitoring {
type Result = ();
}
impl xtra::Message for CollaborativeSettlement {
type Result = ();
}
#[derive(Debug, Clone, PartialEq)]
pub enum Event {
LockFinality(OrderId),
@ -630,17 +613,31 @@ impl xtra::Message for Sync {
impl<C> xtra::Actor for Actor<C> where C: Send + 'static {}
#[async_trait]
impl<C> xtra::Handler<StartMonitoring> for Actor<C>
#[xtra_productivity]
impl<C> Actor<C>
where
C: bdk::electrum_client::ElectrumApi + Send + 'static,
{
async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) {
async fn handle_start_monitoring(
&mut self,
msg: StartMonitoring,
_ctx: &mut xtra::Context<Self>,
) {
let StartMonitoring { id, params } = msg;
self.monitor_all(&params, id);
self.cfds.insert(id, params);
}
fn handle_collaborative_settlement(
&mut self,
collaborative_settlement: CollaborativeSettlement,
) {
self.monitor_close_finality(
collaborative_settlement.tx,
collaborative_settlement.order_id,
);
}
}
#[async_trait]
impl<C> xtra::Handler<Sync> for Actor<C>
@ -659,16 +656,6 @@ impl xtra::Handler<oracle::Attestation> for Actor {
}
}
#[async_trait]
impl<C> xtra::Handler<CollaborativeSettlement> for Actor<C>
where
C: bdk::electrum_client::ElectrumApi + Send + 'static,
{
async fn handle(&mut self, msg: CollaborativeSettlement, _ctx: &mut xtra::Context<Self>) {
self.handle_collaborative_settlement(msg);
}
}
#[cfg(test)]
mod tests {
use super::*;

70
daemon/src/oracle.rs

@ -10,6 +10,7 @@ use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
use xtra::prelude::StrongMessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
@ -104,9 +105,7 @@ impl Actor {
attestation_channel,
}
}
}
impl Actor {
fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_announcements.iter().cloned() {
let this = ctx.address().expect("self to be alive");
@ -140,9 +139,7 @@ impl Actor {
});
}
}
}
impl Actor {
fn update_pending_attestations(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_attestations.iter().copied() {
if !event_id.has_likely_occured() {
@ -199,27 +196,29 @@ impl Actor {
}
}
#[async_trait]
impl xtra::Handler<MonitorAttestation> for Actor {
async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) {
#[xtra_productivity]
impl Actor {
fn handle_monitor_attestation(
&mut self,
msg: MonitorAttestation,
_ctx: &mut xtra::Context<Self>,
) {
if !self.pending_attestations.insert(msg.event_id) {
tracing::trace!("Attestation {} already being monitored", msg.event_id);
}
}
}
#[async_trait]
impl xtra::Handler<FetchAnnouncement> for Actor {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
fn handle_fetch_announcement(
&mut self,
msg: FetchAnnouncement,
_ctx: &mut xtra::Context<Self>,
) {
if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0);
}
}
}
#[async_trait]
impl xtra::Handler<GetAnnouncement> for Actor {
async fn handle(
fn handle_get_announcement(
&mut self,
msg: GetAnnouncement,
_ctx: &mut xtra::Context<Self>,
@ -239,15 +238,21 @@ impl xtra::Handler<GetAnnouncement> for Actor {
announcement
}
}
#[async_trait]
impl xtra::Handler<NewAnnouncementFetched> for Actor {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
fn handle_new_announcement_fetched(
&mut self,
msg: NewAnnouncementFetched,
_ctx: &mut xtra::Context<Self>,
) {
self.pending_announcements.remove(&msg.id);
self.announcements
.insert(msg.id, (msg.expected_outcome_time, msg.nonce_pks));
}
fn handle_sync(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);
}
}
#[async_trait]
@ -295,37 +300,10 @@ impl From<Announcement> for cfd_protocol::Announcement {
impl xtra::Actor for Actor {}
#[async_trait]
impl xtra::Handler<Sync> for Actor {
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);
}
}
impl xtra::Message for Sync {
type Result = ();
}
impl xtra::Message for MonitorAttestation {
type Result = ();
}
impl xtra::Message for FetchAnnouncement {
type Result = ();
}
impl xtra::Message for GetAnnouncement {
type Result = Option<Announcement>;
}
impl xtra::Message for Attestation {
type Result = ();
}
impl xtra::Message for NewAnnouncementFetched {
type Result = ();
}
impl xtra::Message for NewAttestationFetched {
type Result = ();
}

34
xtra_productivity/src/lib.rs

@ -1,6 +1,6 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{FnArg, ImplItem, ItemImpl, ReturnType};
use syn::{FnArg, GenericParam, ImplItem, ItemImpl, ReturnType};
#[proc_macro_attribute]
pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStream {
@ -8,6 +8,30 @@ pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStr
let actor = block.self_ty;
let generic_params = &block.generics.params;
let generic_types = block
.generics
.params
.iter()
.filter_map(|param| match param {
GenericParam::Type(ty) => Some(ty.ident.clone()),
_ => None,
})
.collect::<Vec<_>>();
let additional_bounds = block
.generics
.where_clause
.map(|bounds| {
let predicates = bounds.predicates;
quote! {
#predicates
}
})
.unwrap_or_default();
let code = block
.items
.into_iter()
@ -41,8 +65,12 @@ pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStr
type Result = #result_type;
}
#[async_trait]
impl xtra::Handler<#message_type> for #actor {
#[async_trait::async_trait]
impl<#generic_params> xtra::Handler<#message_type> for #actor
where
#additional_bounds
#(#generic_types: Send + 'static),*
{
async fn handle(&mut self, #message_arg, #context_arg) #method_return #method_block
}
}

27
xtra_productivity/tests/pass/actor_with_generics.rs

@ -0,0 +1,27 @@
use std::marker::PhantomData;
use xtra_productivity::xtra_productivity;
struct ActorWithParam<C> {
ty: PhantomData<C>,
}
struct DummyMessage;
trait Foo {}
impl<C: 'static + Send> xtra::Actor for ActorWithParam<C> {}
// Dummy actor, xtra::Handler and xtra::Message impls generated by xtra_productivity
#[xtra_productivity]
impl<C> ActorWithParam<C>
where
C: Foo,
{
pub fn handle_dummy_message(&mut self, _message: DummyMessage) {
assert_impls_foo::<C>();
}
}
fn assert_impls_foo<T: Foo>() {}
fn main() {}

1
xtra_productivity/tests/pass/can_handle_message.rs

@ -1,4 +1,3 @@
use async_trait::async_trait;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
use xtra_productivity::xtra_productivity;

Loading…
Cancel
Save