//! Rust Challenge mod commands; use std::io::Write; use std::pin::Pin; use std::future::Future; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, watch}; use tokio::time::{Duration, Interval}; use crate::commands::Command; type IoResult = Result; /// Reads input from the user until the termination signal is /// received. It will send command messages every time it reads valid input. fn read_user_input(command_sender: mpsc::Sender, term_sender: watch::Sender) -> IoResult<()> { let mut running: bool = true; let mut buffer = String::new(); // Loop and read the user input. while running { buffer.clear(); std::io::stdin().read_line(&mut buffer)?; std::io::stdout().flush().unwrap(); let input = buffer.trim(); match input { // A termination method wasn't specified in the document, so // this will terminate when either stop, exit, or quit is entered. "exit" | "stop" | "quit" => { println!("Exiting application."); running = false; term_sender.send_replace(running); } _ => { // Check to see if we were given a number. match input.parse::() { Ok(seconds) => { // Here we are just handling the different possible numbers // passed in. -1 cancels commands, less than -1 is ignored // with a message, and greated than -1 is turned into a // propulsion command. if seconds == -1 { match command_sender.blocking_send(Command::Cancel) { Ok(_) => { println!("Cancelling any outstanding commands.") } Err(e) => println!("Failed to send command: {}", e) } } else if seconds < -1 { println!("All propulsion delay times are given in \ seconds within the range of [0, {}]", u32::MAX); println!("A value of -1 will cancel any current \ propulsion commands."); } else { let larger_seconds: u64 = seconds as u64; let delay_duration: Duration = Duration::from_secs(larger_seconds); match command_sender.try_send(Command::Propulsion { delay: delay_duration }) { Ok(_) => { println!("Firing the engines in {} seconds.", seconds) } Err(e) => println!("Failed to send command: {}", e) } } } Err(e) => { println!("Unable to parse the given input into seconds."); println!("Please specify only seconds until the time to \ fire the engines."); } } } } } Ok(()) } /// fn maybe_tick<'a>(interval: Option<&'a mut Interval>) -> Pin + Send + 'a>> { match interval { Some(interval) => Box::pin(async move { interval.tick().await; () }), None => Box::pin(std::future::pending()) } } async fn process_commands(mut command_receiver: mpsc::Receiver, mut term_receiver: watch::Receiver) -> IoResult<()> { let mut propulsion_interval: Option = None; let mut stdout = tokio::io::stdout(); let mut running: bool = true; while running { tokio::select! { Some(command) = command_receiver.recv() => { match command { Command::Cancel => { stdout.write_all(b"Received: Cancel\n").await?; stdout.flush().await?; propulsion_interval = None; } Command::Propulsion { delay } => { let mut buffer: Vec = Vec::new(); writeln!(&mut buffer, "Received: {:?} delay", delay); stdout.write_all(&buffer).await?; stdout.flush().await?; propulsion_interval = Some(tokio::time::interval(delay)); // Skip the first immediate tick if let Some(interval) = propulsion_interval.as_mut() { interval.tick().await; // skip first tick } } } } _ = maybe_tick(propulsion_interval.as_mut()) => { stdout.write_all(b"firing now!\n").await?; stdout.flush().await?; } _ = term_receiver.changed() => { stdout.write_all(b"Communication task received shutdown message.").await?; stdout.flush().await?; running = *term_receiver.borrow_and_update(); } } } Ok(()) } /// Program entry point. #[tokio::main] async fn main() -> IoResult<()> { // The channel that will be used to send satellite commands between tasks. let (command_sender, mut command_receiver) = mpsc::channel::(100); // The channel that will be used to signal termination of the program. // True means the program is running. False means it has been terminated. // // This could be done as a satellite command since the program is driven by // user interaction and it all happens from the input_task, but this is a // program signal so I prefer to keep it seperate from the command messages. // It also allows for moving termination control to another task, say from // Ctrl+C, if the program was changed to use TCP or something else. let (term_sender, mut term_receiver) = watch::channel(true); // Spawn a task to handle reading input from the user. We use a thread here // because tokio recommends it for the blocking read calls for interactive // user input. let input_task = tokio::task::spawn_blocking(move || { read_user_input(command_sender, term_sender) }); // Spawn a task to handle simulating the satellite. let sim_task = tokio::spawn(async move { process_commands(command_receiver, term_receiver).await }); let (sim_result, input_result) = tokio::join!(sim_task, input_task); sim_result?; input_result??; println!("Shutdown complete."); Ok(()) }