Browse Source

Close connections with takers that don't adhere to the message format

refactor/no-log-handler
Thomas Eizinger 4 years ago
parent
commit
8c8cbe759c
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 2
      daemon/src/forward_only_ok.rs
  2. 20
      daemon/src/maker_inc_connections.rs
  3. 5
      daemon/src/send_to_socket.rs

2
daemon/src/forward_only_ok.rs

@ -39,7 +39,7 @@ where
let ok = match result {
Ok(ok) => ok,
Err(e) => {
tracing::error!("Received error: {}", e);
tracing::error!("Stopping forwarding due to error: {}", e);
return KeepRunning::StopSelf;
}

20
daemon/src/maker_inc_connections.rs

@ -176,17 +176,27 @@ impl Actor {
.map_ok(move |msg| FromTaker { taker_id, msg })
.map(forward_only_ok::Message);
let (out_msg_actor_address, mut out_msg_actor_context) = xtra::Context::new(None);
let forward_to_cfd = forward_only_ok::Actor::new(self.taker_msg_channel.clone_channel())
.create(None)
.spawn_global();
tokio::spawn(forward_to_cfd.attach_stream(read));
// only allow outgoing messages while we are successfully reading incoming ones
tokio::spawn(async move {
let mut actor = send_to_socket::Actor::new(write);
let out_msg_actor = send_to_socket::Actor::new(write)
.create(None)
.spawn_global();
out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read))
.await;
tracing::error!("Closing connection to taker {}", taker_id);
actor.shutdown().await;
});
self.write_connections.insert(taker_id, out_msg_actor);
self.write_connections
.insert(taker_id, out_msg_actor_address);
let _ = self
.new_taker_channel

5
daemon/src/send_to_socket.rs

@ -2,6 +2,7 @@ use crate::wire::{self, JsonCodec};
use futures::SinkExt;
use serde::Serialize;
use std::fmt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio_util::codec::FramedWrite;
use xtra::{Handler, Message};
@ -16,6 +17,10 @@ impl<T> Actor<T> {
write: FramedWrite::new(write, JsonCodec::default()),
}
}
pub async fn shutdown(self) {
let _ = self.write.into_inner().shutdown().await;
}
}
#[async_trait::async_trait]

Loading…
Cancel
Save