Browse Source

Extend our futures trait for better ergonomics with remote handles

Simplify dealing with spawning a task on a runtime that returns a remote a
handle that can be used to stop it.
new-http-api
Mariusz Klochowicz 3 years ago
parent
commit
143cc09e65
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 8
      daemon/src/connection.rs
  2. 17
      daemon/src/tokio_ext.rs

8
daemon/src/connection.rs

@ -1,7 +1,8 @@
use crate::tokio_ext::FutureExt;
use crate::{log_error, noise, send_to_socket, wire};
use anyhow::Result;
use futures::future::RemoteHandle;
use futures::{FutureExt, StreamExt};
use futures::StreamExt;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
@ -104,11 +105,10 @@ impl Actor {
let this = ctx.address().expect("self to be alive");
tokio::spawn(this.attach_stream(read));
let (pulse_future, pulse_remote_handle) = ctx
let pulse_remote_handle = ctx
.notify_interval(self.timeout, || MeasurePulse)
.expect("we just started")
.remote_handle();
tokio::spawn(pulse_future);
.spawn_with_handle();
self.connected_state = Some(ConnectedState {
last_heartbeat: SystemTime::now(),

17
daemon/src/tokio_ext.rs

@ -1,3 +1,5 @@
use futures::future::RemoteHandle;
use futures::FutureExt as _;
use std::fmt;
use std::future::Future;
use std::time::Duration;
@ -17,6 +19,12 @@ where
pub trait FutureExt: Future + Sized {
fn timeout(self, duration: Duration) -> Timeout<Self>;
/// Spawn the future on a task in the runtime and return a RemoteHandle to it.
/// The task will be stopped when the handle gets dropped.
fn spawn_with_handle(self) -> RemoteHandle<Self::Output>
where
Self: Future<Output = ()> + Send + 'static;
}
impl<F> FutureExt for F
@ -26,4 +34,13 @@ where
fn timeout(self, duration: Duration) -> Timeout<F> {
timeout(duration, self)
}
fn spawn_with_handle(self) -> RemoteHandle<()>
where
Self: Future<Output = ()> + Send + 'static,
{
let (future, handle) = self.remote_handle();
tokio::spawn(future);
handle
}
}

Loading…
Cancel
Save