|
|
@ -2,11 +2,11 @@ use crate::address_map::{AddressMap, Stopping}; |
|
|
|
use crate::model::cfd::OrderId; |
|
|
|
use crate::model::{Identity, Price, Timestamp, Usd}; |
|
|
|
use crate::tokio_ext::FutureExt; |
|
|
|
use crate::wire::EncryptedJsonCodec; |
|
|
|
use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version}; |
|
|
|
use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; |
|
|
|
use anyhow::{Context, Result}; |
|
|
|
use anyhow::{bail, Context, Result}; |
|
|
|
use bdk::bitcoin::Amount; |
|
|
|
use futures::StreamExt; |
|
|
|
use futures::{SinkExt, StreamExt, TryStreamExt}; |
|
|
|
use std::collections::HashMap; |
|
|
|
use std::net::SocketAddr; |
|
|
|
use std::sync::{Arc, Mutex}; |
|
|
@ -55,7 +55,17 @@ struct MeasurePulse; |
|
|
|
#[derive(Clone, Debug, PartialEq)] |
|
|
|
pub enum ConnectionStatus { |
|
|
|
Online, |
|
|
|
Offline, |
|
|
|
Offline { |
|
|
|
reason: Option<ConnectionCloseReason>, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq)] |
|
|
|
pub enum ConnectionCloseReason { |
|
|
|
VersionMismatch { |
|
|
|
taker_version: Version, |
|
|
|
maker_version: Version, |
|
|
|
}, |
|
|
|
} |
|
|
|
|
|
|
|
/// Message sent from the `setup_taker::Actor` to the
|
|
|
@ -197,8 +207,51 @@ impl Actor { |
|
|
|
(read, write, Arc::new(Mutex::new(noise))) |
|
|
|
}; |
|
|
|
|
|
|
|
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); |
|
|
|
let write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); |
|
|
|
let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); |
|
|
|
let mut write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); |
|
|
|
|
|
|
|
let our_version = Version::current(); |
|
|
|
write.send(TakerToMaker::Hello(our_version.clone())).await?; |
|
|
|
|
|
|
|
match read |
|
|
|
.try_next() |
|
|
|
.timeout(Duration::from_secs(10)) |
|
|
|
.await |
|
|
|
.with_context(|| { |
|
|
|
format!( |
|
|
|
"Maker {} did not send Hello within 10 seconds, dropping connection", |
|
|
|
maker_identity |
|
|
|
) |
|
|
|
})? { |
|
|
|
Ok(Some(wire::MakerToTaker::Hello(maker_version))) => { |
|
|
|
if our_version != maker_version { |
|
|
|
self.status_sender |
|
|
|
.send(ConnectionStatus::Offline { |
|
|
|
reason: Some(ConnectionCloseReason::VersionMismatch { |
|
|
|
taker_version: our_version.clone(), |
|
|
|
maker_version: maker_version.clone(), |
|
|
|
}), |
|
|
|
}) |
|
|
|
.expect("receiver to outlive the actor"); |
|
|
|
|
|
|
|
bail!( |
|
|
|
"Network version mismatch, we are on version {} but taker is on version {}", |
|
|
|
our_version, |
|
|
|
maker_version, |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
unexpected_message => { |
|
|
|
bail!( |
|
|
|
"Unexpected message {:?} from maker {}", |
|
|
|
unexpected_message, |
|
|
|
maker_identity |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
tracing::info!(address = %maker_addr, "Established connection to maker"); |
|
|
|
|
|
|
|
let this = ctx.address().expect("self to be alive"); |
|
|
|
|
|
|
|
let send_to_socket = send_to_socket::Actor::new(write); |
|
|
@ -219,8 +272,6 @@ impl Actor { |
|
|
|
.send(ConnectionStatus::Online) |
|
|
|
.expect("receiver to outlive the actor"); |
|
|
|
|
|
|
|
tracing::info!(address = %maker_addr, "Established connection to maker"); |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
@ -287,6 +338,9 @@ impl Actor { |
|
|
|
tracing::warn!(%order_id, "No active collaborative settlement"); |
|
|
|
} |
|
|
|
} |
|
|
|
wire::MakerToTaker::Hello(_) => { |
|
|
|
tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") |
|
|
|
} |
|
|
|
other => { |
|
|
|
// this one should go to the taker cfd actor
|
|
|
|
log_error!(self.maker_to_taker.send(other)); |
|
|
@ -307,7 +361,7 @@ impl Actor { |
|
|
|
|
|
|
|
if time_since_last_heartbeat > self.heartbeat_timeout { |
|
|
|
self.status_sender |
|
|
|
.send(ConnectionStatus::Offline) |
|
|
|
.send(ConnectionStatus::Offline { reason: None }) |
|
|
|
.expect("watch receiver to outlive the actor"); |
|
|
|
self.connected_state = None; |
|
|
|
} |
|
|
@ -325,7 +379,8 @@ pub async fn connect( |
|
|
|
maker_addresses: Vec<SocketAddr>, |
|
|
|
) { |
|
|
|
loop { |
|
|
|
if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline { |
|
|
|
let connection_status = maker_online_status_feed_receiver.borrow().clone(); |
|
|
|
if matches!(connection_status, ConnectionStatus::Offline { .. }) { |
|
|
|
tracing::debug!("No connection to the maker"); |
|
|
|
'connect: loop { |
|
|
|
for address in &maker_addresses { |
|
|
@ -339,7 +394,7 @@ pub async fn connect( |
|
|
|
.await |
|
|
|
.expect("Taker actor to be present") |
|
|
|
{ |
|
|
|
tracing::trace!(%address, "Failed to establish connection: {:#}", e); |
|
|
|
tracing::warn!(%address, "Failed to establish connection: {:#}", e); |
|
|
|
continue; |
|
|
|
} |
|
|
|
break 'connect; |
|
|
|