This adjusts the process of the compute thread.

The compute thread now implements the state machine using the match
statement.
This commit is contained in:
Jason Travis Smith 2016-06-26 06:45:25 -04:00
parent bc4ed6d0bc
commit 7bd5378500
2 changed files with 151 additions and 67 deletions

View File

@ -1,4 +1,5 @@
use std::collections::binary_heap::BinaryHeap; use std::collections::binary_heap::BinaryHeap;
use std::vec::Drain;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::sync::mpsc::{Receiver, Sender, TryRecvError};
@ -186,99 +187,182 @@ impl Thread for ComputeThread
fn process(&mut self) fn process(&mut self)
{ {
let mut check_messages: bool; let mut check_messages: bool;
let mut task_completed: bool;
// Run this thread until the scheduler decides // Run this thread until the scheduler decides
// to shut it down. // to shut it down.
self.change_state(ThreadState::Idle);
while self.continue_running == true while self.continue_running == true
{ {
// No task was recently completed. match self.state
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match self.current_task
{ {
Some(ref mut task) => ThreadState::Starting =>
{ {
println!("Task FOUND. Calling process."); // The starting state can only lead to the
// Process the task this thread is // idle state, and does so directly.
// currently working on. self.change_state(ThreadState::Idle);
task.process(&mut self.spawner);
match task.get_state()
{
TaskState::Finished =>
{
task_completed = true;
}
_ =>
{
}
}
} }
None => ThreadState::Idle =>
{ {
println!("Task NOT found. Looking for new task."); // Check to see if we have a task.
// Try to get a task to work on. if self.current_task.is_some() == true
self.retrieve_task(); {
// Why are we idling? We should not get here.
debug!("Thread is idling when it should be processing.");
self.change_state(ThreadState::Processing);
}
else
{
// We need to check if there is a task that
// we should be processing.
self.retrieve_task();
if self.current_task.is_some() == true
{
// We got a new task to process,
// so switch to the Processing state.
self.change_state(ThreadState::Processing);
}
}
// Check to see if this thread should be shutdown.
self.process_shutdown_messages();
}
ThreadState::Processing =>
{
let mut is_task_finished: bool;
let mut has_spawned_children: bool;
let mut tasks: Vec<Box<TaskStateMachine>>;
let mut child_task: Option<Box<TaskStateMachine>>;
// Check to see if we have a task. The match statement
// makes this easier to handle, but because it borrows
// the current task we need to mark the action to take
// and then handle it outside of the match statement
// if it changes the current task.
is_task_finished = false;
has_spawned_children = false;
match self.current_task match self.current_task
{ {
Some(_) => Some(ref mut task) =>
{ {
println!("Task RETRIEVED."); // Process the task this thread is
// We have a task to work on, so switch to // currently working on.
// a Processing state. is_task_finished = task.process(&mut self.spawner);
self.change_state(ThreadState::Processing);
// If the task is not finished then
// check to see if any child tasks
// were spawned.
if is_task_finished == false
{
if self.spawner.has_child_tasks() == true
{
has_spawned_children = true;
}
}
} }
None => None =>
{ {
println!("Taskless. Idling thread."); // Why are we processing? We should not get here.
// If we don't have a task to process, debug!("Thread is processing when it should be idling.");
// then we may need to switch over to self.change_state(ThreadState::Idle);
// an idle state. }
match self.state }
// If there are any sibling tasks that were
// spawned then add them to the queue.
if self.spawner.has_sibling_tasks() == true
{
tasks = self.spawner.retrieve_sibling_tasks();
while tasks.is_empty() == false
{
child_task = tasks.pop();
match child_task
{ {
ThreadState::Idle => Some(task) =>
{ {
// The thread is already sitting idle. self.queue_task(task);
} }
_ => None =>
{ {
// There is nothing for this thread debug!("Queueing a sibling task that \
// to process, so mark the thread as idle. did not exist.");
self.change_state(ThreadState::Idle);
} }
} }
} }
} }
}
}
// Check to see if the task this thread // Check for any actions that need to be taken.
// was processing was completed. if is_task_finished == true
if task_completed == true {
{ // If the task was finished then it can be
println!("Task completed."); // dropped and this thread can go to an idle state.
self.current_task = None; self.current_task = None;
self.change_state(ThreadState::Idle); self.change_state(ThreadState::Idle);
}
else if has_spawned_children == true
{
// The current task is not finished so
// queue any child tasks that were spawned.
tasks = self.spawner.retrieve_child_tasks();
while tasks.is_empty() == false
{
child_task = tasks.pop();
match child_task
{
Some(task) =>
{
self.queue_task(task);
}
None =>
{
debug!("Queueing a child task that did not exist.");
}
}
}
}
// Check to see if this thread should be shutdown.
self.process_shutdown_messages();
}
ThreadState::Finished =>
{
let task: Option<Box<TaskStateMachine>>;
// The thread has been put into a finished state.
// This means that the thread should exit its loop.
info!("Shutting down thread.");
self.continue_running = false;
// Also, if the thread is currently working on
// an unfinished task then we should requeue it.
self.current_task = None;
/*
task = self.current_task;
match task
{
Some(old_task) =>
{
if old_task.get_state() != TaskState::Finished
{
self.queue_task(old_task);
}
}
None =>
{
}
}
*/
}
} }
// Sleep the thread so that other threads // Sleep the thread so that other threads
// get a chance to run // get a chance to run
::std::thread::sleep(Duration::new(0, 100)); ::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
self.process_shutdown_messages();
} }
// This thread is finished.
println!("Shutting down thread.");
self.change_state(ThreadState::Finished);
} }
} }

View File

@ -74,19 +74,19 @@ impl Spawner
!self.sibling_tasks.is_empty() !self.sibling_tasks.is_empty()
} }
/// Get an Iterator over the spawned child tasks. /// Get a Vector of the spawned child tasks.
/// ///
/// This will remove all the tasks from this spawner. /// This will remove all the tasks from this spawner.
pub fn drain_child_tasks(&mut self) -> Drain<Box<TaskStateMachine>> pub fn retrieve_child_tasks(&mut self) -> Vec<Box<TaskStateMachine>>
{ {
self.child_tasks.drain(..) self.child_tasks.drain(..).collect()
} }
/// Get an Iterator over the spawned sibling tasks. /// Get a Vector of the spawned sibling tasks.
/// ///
/// This will remove all the tasks from this spawner. /// This will remove all the tasks from this spawner.
pub fn drain_sibling_tasks(&mut self) -> Drain<Box<TaskStateMachine>> pub fn retrieve_sibling_tasks(&mut self) -> Vec<Box<TaskStateMachine>>
{ {
self.sibling_tasks.drain(..) self.sibling_tasks.drain(..).collect()
} }
} }