Browse Source

Allow running taker without connected maker

Instead of blocking and awaiting for connection to maker before the start, spawn
the connection attempt in a separate task that will also attempt to reconnect if
the maker goes offline.

Extend the unit test to cover the new behaviour.

Do not shutdown the taker upon connection loss anymore.

Co-authored-by: rishflab <rishflab@hotmail.com>
debug-collab-settlement
Mariusz Klochowicz 3 years ago
parent
commit
f503d19824
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 51
      daemon/src/connection.rs
  2. 10
      daemon/src/routes_taker.rs
  3. 1
      daemon/src/seed.rs
  4. 71
      daemon/src/taker.rs
  5. 12
      daemon/src/to_sse_event.rs
  6. 25
      daemon/tests/happy_path.rs
  7. 41
      daemon/tests/harness/mod.rs

51
daemon/src/connection.rs

@ -11,6 +11,9 @@ use xtra::prelude::MessageChannel;
use xtra::KeepRunning;
use xtra_productivity::xtra_productivity;
/// Time between reconnection attempts
const CONNECT_TO_MAKER_INTERVAL: Duration = Duration::from_secs(5);
struct ConnectedState {
last_heartbeat: SystemTime,
_tasks: Tasks,
@ -172,3 +175,51 @@ impl Actor {
}
impl xtra::Actor for Actor {}
// TODO: Move the reconnection logic inside the connection::Actor instead of
// depending on a watch channel
pub async fn connect(
mut maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>,
connection_actor_addr: xtra::Address<Actor>,
maker_identity_pk: x25519_dalek::PublicKey,
maker_addresses: Vec<SocketAddr>,
) {
loop {
if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline {
tracing::info!("No connection to the maker, attempting to connect:");
'connect: loop {
for address in &maker_addresses {
tracing::trace!("Connecting to {}", address);
let connect_msg = Connect {
maker_identity_pk,
maker_addr: *address,
};
if let Err(e) = connection_actor_addr
.send(connect_msg)
.await
.expect("Taker actor to be present")
{
tracing::trace!(%address, "Failed to establish connection: {:#}", e);
continue;
}
tracing::debug!("Connection established");
break 'connect;
}
tracing::debug!(
"Tried connecting to {} addresses without success, retrying in {} seconds",
maker_addresses.len(),
CONNECT_TO_MAKER_INTERVAL.as_secs()
);
tokio::time::sleep(CONNECT_TO_MAKER_INTERVAL).await;
}
}
maker_online_status_feed_receiver
.changed()
.await
.expect("watch channel should outlive the future");
}
}

10
daemon/src/routes_taker.rs

@ -1,4 +1,5 @@
use bdk::bitcoin::{Amount, Network};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::{calculate_long_margin, Cfd, Order, OrderId, Role, UpdateCfdProposals};
use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::routes::EmbeddedFileExt;
@ -25,6 +26,7 @@ pub async fn feed(
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
rx_settlements: &State<watch::Receiver<UpdateCfdProposals>>,
rx_maker_status: &State<watch::Receiver<ConnectionStatus>>,
network: &State<Network>,
) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone();
@ -32,12 +34,16 @@ pub async fn feed(
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone();
let mut rx_settlements = rx_settlements.inner().clone();
let mut rx_maker_status = rx_maker_status.inner().clone();
let network = *network.inner();
EventStream! {
let wallet_info = rx_wallet.borrow().clone();
yield wallet_info.to_sse_event();
let maker_status = rx_maker_status.borrow().clone();
yield maker_status.to_sse_event();
let order = rx_order.borrow().clone();
yield order.to_sse_event();
@ -58,6 +64,10 @@ pub async fn feed(
let wallet_info = rx_wallet.borrow().clone();
yield wallet_info.to_sse_event();
},
Ok(()) = rx_maker_status.changed() => {
let maker_status = rx_maker_status.borrow().clone();
yield maker_status.to_sse_event();
},
Ok(()) = rx_order.changed() => {
let order = rx_order.borrow().clone();
yield order.to_sse_event();

1
daemon/src/seed.rs

@ -7,6 +7,7 @@ use sha2::Sha256;
use std::convert::TryInto;
use std::path::Path;
#[derive(Copy, Clone)]
pub struct Seed([u8; 256]);
impl Seed {

71
daemon/src/taker.rs

@ -4,16 +4,15 @@ use bdk::bitcoin::{Address, Amount};
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::bitmex_price_feed::Quote;
use daemon::connection::ConnectionStatus;
use daemon::connection::connect;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, projection,
taker_cfd, wallet, wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
SETTLEMENT_INTERVAL,
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, taker_cfd, wallet,
wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
@ -21,9 +20,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
use tracing_subscriber::filter::LevelFilter;
use watch::channel;
use xtra::prelude::MessageChannel;
@ -31,7 +28,7 @@ use xtra::Actor;
mod routes_taker;
const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5);
pub const ANNOUNCEMENT_LOOKAHEAD: time::Duration = time::Duration::hours(24);
#[derive(Parser)]
struct Opts {
@ -238,7 +235,7 @@ async fn main() -> Result<()> {
let TakerActorSystem {
cfd_actor_addr,
connection_actor_addr,
mut maker_online_status_feed_receiver,
maker_online_status_feed_receiver,
tasks: _tasks,
} = TakerActorSystem::new(
db.clone(),
@ -274,7 +271,14 @@ async fn main() -> Result<()> {
update_cfd_feed_sender,
)));
connect(connection_actor_addr, opts.maker_id, opts.maker).await?;
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;
tasks.add(connect(
maker_online_status_feed_receiver.clone(),
connection_actor_addr,
opts.maker_id,
possible_addresses,
));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
let take_offer_channel = MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
@ -290,6 +294,7 @@ async fn main() -> Result<()> {
.manage(quote_receiver)
.manage(bitcoin_network)
.manage(wallet)
.manage(maker_online_status_feed_receiver)
.mount(
"/api",
rocket::routes![
@ -307,19 +312,6 @@ async fn main() -> Result<()> {
);
let rocket = rocket.ignite().await?;
let shutdown_handle = rocket.shutdown();
// shutdown the rocket server maker if goes offline
tasks.add(async move {
loop {
maker_online_status_feed_receiver.changed().await.unwrap();
if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline {
tracing::info!("Lost connection to maker, shutting down. Please restart the daemon to reconnect");
rocket::Shutdown::notify(shutdown_handle);
return;
}
}
});
rocket.launch().await?;
db.close().await;
@ -327,12 +319,8 @@ async fn main() -> Result<()> {
Ok(())
}
async fn connect(
connection_actor_addr: xtra::Address<connection::Actor>,
maker_identity_pk: x25519_dalek::PublicKey,
maker_addr: String,
) -> Result<()> {
let possible_addresses = tokio::net::lookup_host(&maker_addr)
async fn resolve_maker_addresses(maker_addr: &str) -> Result<Vec<SocketAddr>> {
let possible_addresses = tokio::net::lookup_host(maker_addr)
.await?
.collect::<Vec<_>>();
@ -341,30 +329,5 @@ async fn connect(
maker_addr,
itertools::join(possible_addresses.iter(), ",")
);
loop {
for address in &possible_addresses {
tracing::trace!("Connecting to {}", address);
let connect_msg = connection::Connect {
maker_identity_pk,
maker_addr: *address,
};
if let Err(e) = connection_actor_addr.send(connect_msg).await? {
tracing::debug!(%address, "Failed to establish connection: {:#}", e);
continue;
}
return Ok(());
}
tracing::debug!(
"Tried connecting to {} addresses without success, retrying in {} seconds",
possible_addresses.len(),
CONNECTION_RETRY_INTERVAL.as_secs()
);
sleep(CONNECTION_RETRY_INTERVAL).await;
}
Ok(possible_addresses)
}

12
daemon/src/to_sse_event.rs

@ -1,3 +1,4 @@
use crate::connection::ConnectionStatus;
use crate::model::cfd::{
Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals,
};
@ -369,6 +370,17 @@ impl ToSseEvent for model::WalletInfo {
}
}
impl ToSseEvent for ConnectionStatus {
fn to_sse_event(&self) -> Event {
let connected = match self {
ConnectionStatus::Online => true,
ConnectionStatus::Offline => false,
};
Event::json(&connected).event("maker_status")
}
}
fn to_cfd_state(
cfd_state: &model::cfd::CfdState,
proposal_status: Option<&UpdateCfdProposal>,

25
daemon/tests/happy_path.rs

@ -1,6 +1,7 @@
use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some};
use crate::harness::{
assert_is_same_order, dummy_new_order, init_tracing, start_both, HEARTBEAT_INTERVAL_FOR_TEST,
assert_is_same_order, dummy_new_order, init_tracing, oracle_pk, start_both, Maker, Taker,
HEARTBEAT_INTERVAL_FOR_TEST,
};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::CfdState;
@ -118,7 +119,16 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
async fn taker_notices_lack_of_maker() {
let _guard = init_tracing();
let (maker, mut taker) = start_both().await;
let seed = daemon::seed::Seed::default();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let maker = Maker::start(oracle_pk(), seed, listener).await;
let mut taker = Taker::start(oracle_pk(), maker.listen_addr, maker.identity_pk).await;
assert_eq!(
ConnectionStatus::Online,
next(taker.maker_status_feed()).await.unwrap()
@ -132,4 +142,15 @@ async fn taker_notices_lack_of_maker() {
ConnectionStatus::Offline,
next(taker.maker_status_feed()).await.unwrap(),
);
let listener = tokio::net::TcpListener::bind(local_addr).await.unwrap();
let _maker = Maker::start(oracle_pk(), seed, listener).await;
sleep(HEARTBEAT_INTERVAL_FOR_TEST * 3).await;
assert_eq!(
ConnectionStatus::Online,
next(taker.maker_status_feed()).await.unwrap(),
);
}

41
daemon/tests/harness/mod.rs

@ -3,7 +3,7 @@ use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{Connect, ConnectionStatus};
use daemon::connection::{connect, ConnectionStatus};
use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals};
use daemon::model::{Price, Timestamp, Usd};
@ -18,6 +18,7 @@ use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing::subscriber::DefaultGuard;
@ -34,14 +35,18 @@ pub mod mocks;
pub const HEARTBEAT_INTERVAL_FOR_TEST: Duration = Duration::from_secs(2);
const N_PAYOUTS_FOR_TEST: usize = 5;
pub async fn start_both() -> (Maker, Taker) {
let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str(
pub fn oracle_pk() -> schnorrsig::PublicKey {
schnorrsig::PublicKey::from_str(
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)
.unwrap();
.unwrap()
}
let maker = Maker::start(oracle_pk).await;
let taker = Taker::start(oracle_pk, maker.listen_addr, maker.identity_pk).await;
pub async fn start_both() -> (Maker, Taker) {
let maker_seed = Seed::default();
let maker_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let maker = Maker::start(oracle_pk(), maker_seed, maker_listener).await;
let taker = Taker::start(oracle_pk(), maker.listen_addr, maker.identity_pk).await;
(maker, taker)
}
@ -66,7 +71,11 @@ impl Maker {
&mut self.order_feed_receiver
}
pub async fn start(oracle_pk: schnorrsig::PublicKey) -> Self {
pub async fn start(
oracle_pk: schnorrsig::PublicKey,
seed: Seed,
listener: TcpListener,
) -> Self {
let db = in_memory_db().await;
let mut mocks = mocks::Mocks::default();
@ -79,7 +88,6 @@ impl Maker {
let settlement_interval = time::Duration::hours(24);
let seed = Seed::default();
let (identity_pk, identity_sk) = seed.derive_identity();
let (projection_actor, projection_context) = xtra::Context::new(None);
@ -126,8 +134,6 @@ impl Maker {
update_cfd_feed_sender,
)));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
@ -261,15 +267,12 @@ impl Taker {
update_cfd_feed_sender,
)));
taker
.connection_actor_addr
.send(Connect {
maker_identity_pk: maker_noise_pub_key,
maker_addr: maker_address,
})
.await
.unwrap()
.unwrap();
tasks.add(connect(
taker.maker_online_status_feed_receiver.clone(),
taker.connection_actor_addr.clone(),
maker_noise_pub_key,
vec![maker_address],
));
Self {
system: taker,

Loading…
Cancel
Save