Browse Source

Merge #738

738: Introduce `AddressMap` primitive r=thomaseizinger a=thomaseizinger

This should remove some duplication as we will repeat this pattern
a couple of times.


Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
chore/leaner-release-process
bors[bot] 3 years ago
committed by GitHub
parent
commit
56754d7b17
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      daemon/src/address_map.rs
  2. 1
      daemon/src/lib.rs
  3. 31
      daemon/src/taker_cfd.rs

51
daemon/src/address_map.rs

@ -0,0 +1,51 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::hash::Hash;
pub struct AddressMap<K, A> {
inner: HashMap<K, xtra::Address<A>>,
}
impl<K, A> Default for AddressMap<K, A> {
fn default() -> Self {
Self {
inner: HashMap::new(),
}
}
}
impl<K, A> AddressMap<K, A>
where
K: Eq + Hash,
{
pub fn get_disconnected(&mut self, key: K) -> Result<Disconnected<'_, K, A>, StillConnected> {
let entry = self.inner.entry(key);
if matches!(entry, Entry::Occupied(ref occupied) if occupied.get().is_connected()) {
return Err(StillConnected);
}
Ok(Disconnected { entry })
}
}
#[derive(thiserror::Error, Debug)]
#[error("The address is still connected")]
pub struct StillConnected;
pub struct Disconnected<'a, K, A> {
entry: Entry<'a, K, xtra::Address<A>>,
}
impl<'a, K, A> Disconnected<'a, K, A> {
pub fn insert(self, address: xtra::Address<A>) {
match self.entry {
Entry::Occupied(mut occ) => {
occ.insert(address);
}
Entry::Vacant(vacc) => {
vacc.insert(address);
}
};
}
}

1
daemon/src/lib.rs

@ -20,6 +20,7 @@ use xtra::{Actor, Address};
pub mod sqlx_ext; // Must come first because it is a macro.
pub mod actors;
pub mod address_map;
pub mod auth;
pub mod bitmex_price_feed;
pub mod cfd_actors;

31
daemon/src/taker_cfd.rs

@ -1,3 +1,4 @@
use crate::address_map::AddressMap;
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
@ -19,7 +20,6 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use xtra::prelude::*;
use xtra::Actor as _;
@ -62,7 +62,7 @@ pub struct Actor<O, M, W> {
projection_actor: Address<projection::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>,
roll_over_state: RollOverState,
oracle_actor: Address<O>,
current_pending_proposals: UpdateCfdProposals,
@ -98,7 +98,7 @@ where
oracle_actor,
current_pending_proposals: HashMap::new(),
n_payouts,
setup_actors: HashMap::new(),
setup_actors: AddressMap::default(),
tasks: Tasks::default(),
}
}
@ -360,13 +360,15 @@ where
quantity: Usd,
ctx: &mut Context<Self>,
) -> Result<()> {
let entry = self.setup_actors.entry(order_id);
if matches!(entry, Entry::Occupied(ref occupied) if occupied.get().is_connected()) {
bail!(
"A contract setup for order id {} is already in progress",
order_id
)
}
let disconnected = self
.setup_actors
.get_disconnected(order_id)
.with_context(|| {
format!(
"Contract setup for order {} is already in progress",
order_id
)
})?;
let mut conn = self.db.acquire().await?;
@ -408,14 +410,7 @@ where
.create(None)
.run();
match entry {
Entry::Occupied(mut disconnected) => {
disconnected.insert(addr);
}
Entry::Vacant(vacant) => {
vacant.insert(addr);
}
}
disconnected.insert(addr);
self.tasks.add(fut);

Loading…
Cancel
Save