diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 5ce4448..40f0f95 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,7 +1,8 @@ +use crate::tokio_ext::FutureExt; use crate::{log_error, noise, send_to_socket, wire}; use anyhow::Result; use futures::future::RemoteHandle; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -104,11 +105,10 @@ impl Actor { let this = ctx.address().expect("self to be alive"); tokio::spawn(this.attach_stream(read)); - let (pulse_future, pulse_remote_handle) = ctx + let pulse_remote_handle = ctx .notify_interval(self.timeout, || MeasurePulse) .expect("we just started") - .remote_handle(); - tokio::spawn(pulse_future); + .spawn_with_handle(); self.connected_state = Some(ConnectedState { last_heartbeat: SystemTime::now(), diff --git a/daemon/src/tokio_ext.rs b/daemon/src/tokio_ext.rs index 4a7ff52..f5783b2 100644 --- a/daemon/src/tokio_ext.rs +++ b/daemon/src/tokio_ext.rs @@ -1,3 +1,5 @@ +use futures::future::RemoteHandle; +use futures::FutureExt as _; use std::fmt; use std::future::Future; use std::time::Duration; @@ -17,6 +19,12 @@ where pub trait FutureExt: Future + Sized { fn timeout(self, duration: Duration) -> Timeout; + + /// Spawn the future on a task in the runtime and return a RemoteHandle to it. + /// The task will be stopped when the handle gets dropped. + fn spawn_with_handle(self) -> RemoteHandle + where + Self: Future + Send + 'static; } impl FutureExt for F @@ -26,4 +34,13 @@ where fn timeout(self, duration: Duration) -> Timeout { timeout(duration, self) } + + fn spawn_with_handle(self) -> RemoteHandle<()> + where + Self: Future + Send + 'static, + { + let (future, handle) = self.remote_handle(); + tokio::spawn(future); + handle + } }