Thomas Eizinger
3 years ago
committed by
GitHub
7 changed files with 112 additions and 67 deletions
@ -0,0 +1,35 @@ |
|||
use futures::SinkExt; |
|||
use serde::Serialize; |
|||
use std::fmt; |
|||
use tokio::net::tcp::OwnedWriteHalf; |
|||
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; |
|||
use xtra::{Handler, Message}; |
|||
|
|||
pub struct Actor { |
|||
write: FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>, |
|||
} |
|||
|
|||
impl Actor { |
|||
pub fn new(write: OwnedWriteHalf) -> Self { |
|||
Self { |
|||
write: FramedWrite::new(write, LengthDelimitedCodec::new()), |
|||
} |
|||
} |
|||
} |
|||
|
|||
#[async_trait::async_trait] |
|||
impl<T> Handler<T> for Actor |
|||
where |
|||
T: Message<Result = ()> + Serialize + fmt::Display, |
|||
{ |
|||
async fn handle(&mut self, message: T, ctx: &mut xtra::Context<Self>) { |
|||
let bytes = serde_json::to_vec(&message).expect("serialization should never fail"); |
|||
|
|||
if let Err(e) = self.write.send(bytes.into()).await { |
|||
tracing::error!("Failed to write message {} to socket: {}", message, e); |
|||
ctx.stop(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
impl xtra::Actor for Actor {} |
@ -1,31 +0,0 @@ |
|||
use futures::{Future, SinkExt}; |
|||
use serde::Serialize; |
|||
use tokio::net::tcp::OwnedWriteHalf; |
|||
use tokio::sync::mpsc; |
|||
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; |
|||
|
|||
pub fn new<T>(write: OwnedWriteHalf) -> (impl Future<Output = ()>, mpsc::UnboundedSender<T>) |
|||
where |
|||
T: Serialize, |
|||
{ |
|||
let (sender, mut receiver) = mpsc::unbounded_channel::<T>(); |
|||
|
|||
let actor = async move { |
|||
let mut framed_write = FramedWrite::new(write, LengthDelimitedCodec::new()); |
|||
|
|||
while let Some(message) = receiver.recv().await { |
|||
match framed_write |
|||
.send(serde_json::to_vec(&message).unwrap().into()) |
|||
.await |
|||
{ |
|||
Ok(_) => {} |
|||
Err(_) => { |
|||
tracing::error!("TCP connection error"); |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
}; |
|||
|
|||
(actor, sender) |
|||
} |
Loading…
Reference in new issue