3

Problem

I'm working with asynchronous code in Rust and trying to handle user input from stdin in a cancel-safe manner. My goal is to be able to detect when a user presses a key (or hits enter) to interrupt a long-running async operation, but I want to ensure that the async operation is cancellable without blocking the executor.

In the following example, I use select! to wait for either the completion of a future (fut) or a user input from stdin. However, the issue I'm running into is that if the user doesn't provide input, stdin seems to block the executor, preventing the future from completing when it otherwise should.

So far I have tried

  • Using mspc channels to cancel the task
  • Dropping stdin
  • Similar variations of spawning blocking/non_blocking tasks

And considered writing the new line from another process

use std::future::Future;

use console::style;
use spinoff::{spinners, Spinner};
use tokio::{io::{self, AsyncBufReadExt, BufReader}, select};

pub async fn interact_with_cancel<R>(prompt: &str, fut: impl Future<Output = R>) -> Option<R> {
    let mut spinner = Spinner::new(
        spinners::Moon, 
        format!("{} {}", prompt, style("| hit enter to cancel").dim()), 
        None
    );

    let mut stdin = BufReader::new(io::stdin()).lines();
    
    select! {
        // next_line is not cleaned up once fut returns
        // and instead prevents the function from 
        // returning until it receives input
        _ = stdin.next_line() => {
            spinner.clear();

            return None;
        },
        result = fut => {
            spinner.clear();

            return Some(result)
        }
    };
}

#[cfg(test)]
mod test {
    use std::time::Duration;

    use super::*;

    #[tokio::test]
    async fn test_interact_with_cancel() {
        let _ = interact_with_cancel("testing", tokio::time::sleep(Duration::from_secs(2)))
            .await;
    }
}

2 Answers 2

1

Reading tokio::io::stdin documentation (emphasis mine):

This handle is best used for non-interactive uses, such as when a file is piped into the application. For technical reasons, stdin is implemented by using an ordinary blocking read on a separate thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang until the user presses enter.

For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.

So, among your trials, have you tried explicitly implementing a thread, which would be responsible of user interaction, by:

  • displaying current task/progress (maybe through a mpsc::Receiver<String>)
  • capturing user inputs, and sending interruption signal through a mpsc::channel::Sender<MyCommand> that is listened by interruptible tasks?
Sign up to request clarification or add additional context in comments.

Comments

0

Overview

After experimenting with several approaches, I found that spawning a child process to handle stdin input provided the most straightforward solution for my use case. The child process allows me to handle user input using read_line and a spinner for rendering, while still keeping the async operations cancellable.

Here’s a high-level overview of my solution:

  • I spawned a child process to manage the stdin input.
  • The child process was responsible for reading the input while the main async task was able to continue executing without being blocked.
  • The task was cancellable by other futures because the stdin read operation in the child process did not block the main executor.
  • This method avoided the blocking behavior I encountered with tokio::io::stdin, where blocking reads would prevent the async future from completing until user input was received.

Child process

use std::env;

use console::style;
use spinoff::{spinners, Spinner};

fn main() {
    let args: Vec<String> = env::args().collect();
    let prompt = args.get(1).expect("Expected prompt argument");

    let mut spinner = Spinner::new(
        spinners::Moon, 
        format!("{} {}", prompt, style("| hit enter to cancel").dim()), 
        None
    );

    let mut input = String::new();
    std::io::stdin().read_line(&mut input).expect("Failed to read input");
    spinner.clear();
}

Main process

pub async fn interact_with_cancel<R>(prompt: &str, fut: impl Future<Output = R>) -> Option<R> {
    let binary_path = "/path/to/target/debug/detect_enter";

    let mut child = Command::new(binary_path)
        .arg(prompt)
        .spawn()
        .expect("Failed to start detect_enter process");

    let process_id = child.id();

    let exit_handle = tokio::spawn(async move {
        child.wait().unwrap();
    });

    select! {
        _ = exit_handle => return None,
        fut = fut => {
            let _ = Command::new("kill")
                .args(["-9", &process_id.to_string()])
                .spawn()
                .expect("Failed to kill process on Unix");

            return Some(fut)
        }
    };
}

This solution requires handling stdin in a child process, which is OS-specific. If you're targeting multiple platforms, you will need to manually handle cross-platform implementations. The process management commands, like kill, are platform-dependent, so this approach may need modifications for Windows or other operating systems.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.