From d32f1e27a331b541ec64f09ef78dfcd4c47a5bd0 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 30 Jan 2024 14:42:57 -0500 Subject: [PATCH 1/2] Ignore client messages after stopping the IO task --- upstairs/src/client.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index f4e93b250..42f7ccf5c 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -285,14 +285,22 @@ impl DownstairsClient { /// must the select expressions be cancel safe, but the **bodies** must also /// be cancel-safe. This is why we simply return a single value in the body /// of each statement. + /// + /// This function will wait forever if we have asked for the client task to + /// stop, so it should only be called in a higher-level `select!`. pub(crate) async fn select(&mut self) -> ClientAction { tokio::select! { - d = self.client_task.client_response_rx.recv() => { + d = self.client_task.client_response_rx.recv(), + if self.client_task.client_stop_tx.is_some() => + { match d { Some(c) => c.into(), None => ClientAction::ChannelClosed, } } + _ = futures::future::pending() => { + unreachable!() + } } } From fc1f50fc70aabef198327b5dc29d6df851bd835b Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 30 Jan 2024 15:41:26 -0500 Subject: [PATCH 2/2] Propagate non-Response actions even when stopping the client --- upstairs/src/client.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 42f7ccf5c..8b3c748ba 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -289,17 +289,17 @@ impl DownstairsClient { /// This function will wait forever if we have asked for the client task to /// stop, so it should only be called in a higher-level `select!`. pub(crate) async fn select(&mut self) -> ClientAction { - tokio::select! { - d = self.client_task.client_response_rx.recv(), - if self.client_task.client_stop_tx.is_some() => + loop { + let out = match self.client_task.client_response_rx.recv().await { + Some(c) => c.into(), + None => break ClientAction::ChannelClosed, + }; + // Ignore client responses if we have told the client to exit (we + // still propagate other ClientAction variants, e.g. TaskStopped). + if self.client_task.client_stop_tx.is_some() + || !matches!(out, ClientAction::Response(..)) { - match d { - Some(c) => c.into(), - None => ClientAction::ChannelClosed, - } - } - _ = futures::future::pending() => { - unreachable!() + break out; } } }