Browse Source

Merge pull request #158 from comit-network/inc-connection-actor

fix-bad-api-calls
Thomas Eizinger 3 years ago
committed by GitHub
parent
commit
bfa7db680f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      daemon/src/maker.rs
  2. 63
      daemon/src/maker_inc_connections.rs

46
daemon/src/maker.rs

@ -1,20 +1,18 @@
use crate::auth::MAKER_USERNAME;
use crate::model::TakerId;
use crate::seed::Seed;
use crate::wallet::Wallet;
use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network;
use clap::Clap;
use futures::StreamExt;
use model::cfd::{Cfd, Order};
use model::WalletInfo;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use std::collections::HashMap;
use std::path::PathBuf;
use std::task::Poll;
use tokio::sync::watch;
use tokio_util::codec::FramedRead;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
@ -190,38 +188,22 @@ async fn main() -> Result<()> {
HashMap::new(),
cfd_maker_actor_inbox.clone(),
)));
tokio::spawn({
let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone();
let maker_inc_connections_address = maker_inc_connections_address.clone();
async move {
loop {
if let Ok((socket, remote_addr)) = listener.accept().await {
tracing::info!("Connected to {}", remote_addr);
let taker_id = TakerId::default();
let (read, write) = socket.into_split();
let read = FramedRead::new(read, wire::JsonCodec::new()).map(
move |item| maker_cfd::TakerStreamMessage { taker_id, item },
);
tokio::spawn(cfd_maker_actor_inbox.clone().attach_stream(read));
let out_msg_actor = send_to_socket::Actor::new(write)
.create(None)
.spawn_global();
maker_inc_connections_address
.do_send_async(maker_inc_connections::NewTakerOnline {
taker_id,
out_msg_actor,
})
.await
.unwrap();
};
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection {
stream,
address,
}
}
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
Ok(rocket.manage(cfd_maker_actor_inbox))

63
daemon/src/maker_inc_connections.rs

@ -4,8 +4,15 @@ use crate::model::TakerId;
use crate::{maker_cfd, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
use futures::StreamExt;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::{Actor as _, KeepRunning};
pub struct BroadcastOrder(pub Option<Order>);
@ -23,9 +30,14 @@ pub struct TakerMessage {
pub command: TakerCommand,
}
pub struct NewTakerOnline {
pub taker_id: TakerId,
pub out_msg_actor: Address<send_to_socket::Actor<wire::MakerToTaker>>,
pub enum ListenerMessage {
NewConnection {
stream: TcpStream,
address: SocketAddr,
},
Error {
source: io::Error,
},
}
pub struct Actor {
@ -90,14 +102,22 @@ impl Actor {
Ok(())
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
self.cfd_actor
.do_send_async(maker_cfd::NewTakerOnline { id: msg.taker_id })
.await?;
async fn handle_new_connection(&mut self, stream: TcpStream, address: SocketAddr) {
let taker_id = TakerId::default();
self.write_connections
.insert(msg.taker_id, msg.out_msg_actor);
Ok(())
tracing::info!("New taker {} connected on {}", taker_id, address);
let (read, write) = stream.into_split();
let read = FramedRead::new(read, wire::JsonCodec::new())
.map(move |item| maker_cfd::TakerStreamMessage { taker_id, item });
tokio::spawn(self.cfd_actor.clone().attach_stream(read));
let out_msg_actor = send_to_socket::Actor::new(write)
.create(None)
.spawn_global();
self.write_connections.insert(taker_id, out_msg_actor);
}
}
@ -124,9 +144,22 @@ impl Handler<TakerMessage> for Actor {
}
#[async_trait]
impl Handler<NewTakerOnline> for Actor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_taker_online(msg));
impl Handler<ListenerMessage> for Actor {
async fn handle(&mut self, msg: ListenerMessage, _ctx: &mut Context<Self>) -> KeepRunning {
match msg {
ListenerMessage::NewConnection { stream, address } => {
self.handle_new_connection(stream, address).await;
KeepRunning::Yes
}
ListenerMessage::Error { source } => {
tracing::warn!("TCP listener produced an error: {}", source);
// Maybe we should move the actual listening on the socket into here and restart the
// actor upon an error?
KeepRunning::Yes
}
}
}
}
@ -138,8 +171,8 @@ impl Message for TakerMessage {
type Result = ();
}
impl Message for NewTakerOnline {
type Result = ();
impl Message for ListenerMessage {
type Result = KeepRunning;
}
impl xtra::Actor for Actor {}

Loading…
Cancel
Save