From 7bd5378500d0b4a39dc19da29db08075f2d69dc7 Mon Sep 17 00:00:00 2001 From: Jason Travis Smith Date: Sun, 26 Jun 2016 06:45:25 -0400 Subject: [PATCH] This adjusts the process of the compute thread. The compute thread now implements the state machine using the match statement. --- src/compute_thread.rs | 206 +++++++++++++++++++++++++++++------------- src/spawner.rs | 12 +-- 2 files changed, 151 insertions(+), 67 deletions(-) diff --git a/src/compute_thread.rs b/src/compute_thread.rs index f14c339..07ab14c 100644 --- a/src/compute_thread.rs +++ b/src/compute_thread.rs @@ -1,4 +1,5 @@ use std::collections::binary_heap::BinaryHeap; +use std::vec::Drain; use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; @@ -186,99 +187,182 @@ impl Thread for ComputeThread fn process(&mut self) { let mut check_messages: bool; - let mut task_completed: bool; // Run this thread until the scheduler decides // to shut it down. - self.change_state(ThreadState::Idle); while self.continue_running == true { - // No task was recently completed. - task_completed = false; - - // Make sure that this thread has a Task - // to currently work on. - match self.current_task + match self.state { - Some(ref mut task) => + ThreadState::Starting => { - println!("Task FOUND. Calling process."); - // Process the task this thread is - // currently working on. - task.process(&mut self.spawner); - - match task.get_state() - { - TaskState::Finished => - { - task_completed = true; - } - - _ => - { - } - } + // The starting state can only lead to the + // idle state, and does so directly. + self.change_state(ThreadState::Idle); } - None => + ThreadState::Idle => { - println!("Task NOT found. Looking for new task."); - // Try to get a task to work on. - self.retrieve_task(); + // Check to see if we have a task. + if self.current_task.is_some() == true + { + // Why are we idling? We should not get here. + debug!("Thread is idling when it should be processing."); + self.change_state(ThreadState::Processing); + } + else + { + // We need to check if there is a task that + // we should be processing. + self.retrieve_task(); + if self.current_task.is_some() == true + { + // We got a new task to process, + // so switch to the Processing state. + self.change_state(ThreadState::Processing); + } + } + + // Check to see if this thread should be shutdown. + self.process_shutdown_messages(); + } + + ThreadState::Processing => + { + let mut is_task_finished: bool; + let mut has_spawned_children: bool; + let mut tasks: Vec>; + let mut child_task: Option>; + + // Check to see if we have a task. The match statement + // makes this easier to handle, but because it borrows + // the current task we need to mark the action to take + // and then handle it outside of the match statement + // if it changes the current task. + is_task_finished = false; + has_spawned_children = false; match self.current_task { - Some(_) => + Some(ref mut task) => { - println!("Task RETRIEVED."); - // We have a task to work on, so switch to - // a Processing state. - self.change_state(ThreadState::Processing); + // Process the task this thread is + // currently working on. + is_task_finished = task.process(&mut self.spawner); + + // If the task is not finished then + // check to see if any child tasks + // were spawned. + if is_task_finished == false + { + if self.spawner.has_child_tasks() == true + { + has_spawned_children = true; + } + } } None => { - println!("Taskless. Idling thread."); - // If we don't have a task to process, - // then we may need to switch over to - // an idle state. - match self.state + // Why are we processing? We should not get here. + debug!("Thread is processing when it should be idling."); + self.change_state(ThreadState::Idle); + } + } + + // If there are any sibling tasks that were + // spawned then add them to the queue. + if self.spawner.has_sibling_tasks() == true + { + tasks = self.spawner.retrieve_sibling_tasks(); + while tasks.is_empty() == false + { + child_task = tasks.pop(); + match child_task { - ThreadState::Idle => + Some(task) => { - // The thread is already sitting idle. + self.queue_task(task); } - _ => + None => { - // There is nothing for this thread - // to process, so mark the thread as idle. - self.change_state(ThreadState::Idle); + debug!("Queueing a sibling task that \ + did not exist."); } } } } - } - } - // Check to see if the task this thread - // was processing was completed. - if task_completed == true - { - println!("Task completed."); - self.current_task = None; - self.change_state(ThreadState::Idle); + // Check for any actions that need to be taken. + if is_task_finished == true + { + // If the task was finished then it can be + // dropped and this thread can go to an idle state. + self.current_task = None; + self.change_state(ThreadState::Idle); + } + else if has_spawned_children == true + { + // The current task is not finished so + // queue any child tasks that were spawned. + tasks = self.spawner.retrieve_child_tasks(); + while tasks.is_empty() == false + { + child_task = tasks.pop(); + match child_task + { + Some(task) => + { + self.queue_task(task); + } + + None => + { + debug!("Queueing a child task that did not exist."); + } + } + } + } + + // Check to see if this thread should be shutdown. + self.process_shutdown_messages(); + } + + ThreadState::Finished => + { + let task: Option>; + + // The thread has been put into a finished state. + // This means that the thread should exit its loop. + info!("Shutting down thread."); + self.continue_running = false; + + // Also, if the thread is currently working on + // an unfinished task then we should requeue it. + self.current_task = None; + /* + task = self.current_task; + match task + { + Some(old_task) => + { + if old_task.get_state() != TaskState::Finished + { + self.queue_task(old_task); + } + } + + None => + { + } + } + */ + } } // Sleep the thread so that other threads // get a chance to run ::std::thread::sleep(Duration::new(0, 100)); - - // Check to see if this thread should be shutdown. - self.process_shutdown_messages(); } - - // This thread is finished. - println!("Shutting down thread."); - self.change_state(ThreadState::Finished); } } diff --git a/src/spawner.rs b/src/spawner.rs index 4571df4..6a58d16 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -74,19 +74,19 @@ impl Spawner !self.sibling_tasks.is_empty() } - /// Get an Iterator over the spawned child tasks. + /// Get a Vector of the spawned child tasks. /// /// This will remove all the tasks from this spawner. - pub fn drain_child_tasks(&mut self) -> Drain> + pub fn retrieve_child_tasks(&mut self) -> Vec> { - self.child_tasks.drain(..) + self.child_tasks.drain(..).collect() } - /// Get an Iterator over the spawned sibling tasks. + /// Get a Vector of the spawned sibling tasks. /// /// This will remove all the tasks from this spawner. - pub fn drain_sibling_tasks(&mut self) -> Drain> + pub fn retrieve_sibling_tasks(&mut self) -> Vec> { - self.sibling_tasks.drain(..) + self.sibling_tasks.drain(..).collect() } }