Most of the new task processing is now complete.

The task states and how to spawn new tasks has all been sorted out.
All that is left at this point is to handle the thread portion of task
management and to determine how to get messages from the new child
task's completion back to the parent task.
This commit is contained in:
Jason Travis Smith
2016-06-25 05:26:10 -04:00
parent 5d4e3ab9fa
commit bc4ed6d0bc
17 changed files with 632 additions and 91 deletions

View File

@ -1,8 +1,10 @@
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use ::spawner::Spawner;
use ::task::Task;
use ::task_state::TaskState;
use ::task_state_machine::TaskStateMachine;
use ::thread::Thread;
use ::thread_state::ThreadState;
@ -15,7 +17,7 @@ pub struct BlockingThread
state: ThreadState,
///
current_task: Box<Task>,
current_task: Box<TaskStateMachine>,
///
shutdown_receiver: Receiver<bool>,
@ -24,7 +26,10 @@ pub struct BlockingThread
state_sender: Sender<ThreadState>,
///
continue_running: bool
continue_running: bool,
///
spawner: Spawner
}
@ -32,7 +37,7 @@ pub struct BlockingThread
impl BlockingThread
{
///
pub fn new(task: Box<Task>,
pub fn new(task: Box<TaskStateMachine>,
shutdown_r: Receiver<bool>,
state_s: Sender<ThreadState>)
-> BlockingThread
@ -43,7 +48,8 @@ impl BlockingThread
current_task: task,
shutdown_receiver: shutdown_r,
state_sender: state_s,
continue_running: true
continue_running: true,
spawner: Spawner::new()
}
}
@ -130,7 +136,7 @@ impl Thread for BlockingThread
{
// Process the task this thread is
// currently working on.
self.current_task.process();
self.current_task.process(&mut self.spawner);
match self.current_task.get_state()
{

62
src/child_task.rs Normal file
View File

@ -0,0 +1,62 @@
use ::mortality::Mortality;
use ::priority::Priority;
use ::spawner::Spawner;
use ::task::Task;
/// A Task created from another Task that is required
/// to be completed before the parent task can finish.
pub struct ChildTask<T>
{
///
task: T
}
impl<T> ChildTask<T> where T: Task
{
/// Decorates a task with the required portions
/// needed for children of other Tasks.
pub fn new(decorated_task: T) -> ChildTask<T>
{
ChildTask
{
task: decorated_task
}
}
}
impl<T> Task for ChildTask<T> where T: Task
{
fn get_name(&self) -> &str
{
// Just return the contained Task's name.
self.task.get_name()
}
fn get_mortality(&self) -> Mortality
{
// Just return the contained Task's Mortality.
self.task.get_mortality()
}
fn get_priority(&self) -> Priority
{
// Just return the contained Task's Priority.
self.task.get_priority()
}
fn reset(&mut self)
{
// Reset the contained Task.
self.task.reset();
}
fn process(&mut self, spawner: &mut Spawner) -> bool
{
// Process the contained Task.
self.task.process(spawner)
}
}

View File

@ -4,8 +4,10 @@ use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use ::spawner::Spawner;
use ::task::Task;
use ::task_state::TaskState;
use ::task_state_machine::TaskStateMachine;
use ::thread::Thread;
use ::thread_state::ThreadState;
@ -18,10 +20,10 @@ pub struct ComputeThread
state: ThreadState,
///
current_task: Option<Box<Task>>,
current_task: Option<Box<TaskStateMachine>>,
///
task_queue: Arc<Mutex<BinaryHeap<Box<Task>>>>,
task_queue: Arc<Mutex<BinaryHeap<Box<TaskStateMachine>>>>,
///
shutdown_receiver: Receiver<bool>,
@ -31,6 +33,9 @@ pub struct ComputeThread
///
continue_running: bool,
///
spawner: Spawner
}
@ -38,7 +43,7 @@ pub struct ComputeThread
impl ComputeThread
{
///
pub fn new(queue: Arc<Mutex<BinaryHeap<Box<Task>>>>,
pub fn new(queue: Arc<Mutex<BinaryHeap<Box<TaskStateMachine>>>>,
shutdown_r: Receiver<bool>,
state_s: Sender<ThreadState>)
-> ComputeThread
@ -50,7 +55,8 @@ impl ComputeThread
task_queue: queue,
shutdown_receiver: shutdown_r,
state_sender: state_s,
continue_running: true
continue_running: true,
spawner: Spawner::new()
}
}
@ -113,7 +119,7 @@ impl ComputeThread
}
/// Queues a task to be processed.
fn queue_task(&mut self, task: Box<Task>)
fn queue_task(&mut self, task: Box<TaskStateMachine>)
{
// Just add the task to the queue.
match self.task_queue.lock()
@ -196,9 +202,10 @@ impl Thread for ComputeThread
{
Some(ref mut task) =>
{
println!("Task FOUND. Calling process.");
// Process the task this thread is
// currently working on.
task.process();
task.process(&mut self.spawner);
match task.get_state()
{
@ -215,12 +222,14 @@ impl Thread for ComputeThread
None =>
{
println!("Task NOT found. Looking for new task.");
// Try to get a task to work on.
self.retrieve_task();
match self.current_task
{
Some(_) =>
{
println!("Task RETRIEVED.");
// We have a task to work on, so switch to
// a Processing state.
self.change_state(ThreadState::Processing);
@ -228,6 +237,7 @@ impl Thread for ComputeThread
None =>
{
println!("Taskless. Idling thread.");
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.

View File

@ -5,18 +5,33 @@ extern crate chrono;
mod scheduler;
mod mortality;
mod priority;
pub mod monitor;
mod spawner;
pub mod task_state;
mod task;
mod taskable;
mod task_container;
mod task_state;
mod blocking_thread;
mod compute_thread;
mod thread;
mod thread_data;
mod thread_state;
pub mod child_task;
pub mod task_container;
pub mod task_state_machine;
pub mod thread_data;
pub mod thread_state;
pub mod thread;
pub mod blocking_thread;
pub mod compute_thread;
mod scheduler;
pub use self::mortality::Mortality;
pub use self::priority::Priority;
pub use self::spawner::Spawner;
pub use self::task::Task;
pub use self::scheduler::Scheduler;
pub use self::taskable::Taskable;

18
src/monitor.rs Normal file
View File

@ -0,0 +1,18 @@
///
pub struct Monitor
{
g: i8
}
impl Monitor
{
pub fn new() -> Monitor
{
Monitor
{
g: 0i8
}
}
}

56
src/mortality.rs Normal file
View File

@ -0,0 +1,56 @@
/// Tasks can either be removed from the processing queue or added
/// back to the queue when they are finished processing.
#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub enum Mortality
{
/// A task that will be reset when finished so that
/// instead of dying, it is reborn.
Immortal,
/// A task that will be done processing when it is finished.
Mortal
}
impl Mortality
{
/// Get a str representation of this variant.
pub fn to_str(&self) -> &'static str
{
match *self
{
Mortality::Immortal =>
{
"Immortal"
}
Mortality::Mortal =>
{
"Mortal"
}
}
}
/// Get a String representation of this variant.
pub fn to_string(&self) -> String
{
String::from(self.to_str())
}
}
impl ::std::fmt::Debug for Mortality
{
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result
{
write!(f, "{}", self.to_str())
}
}
impl ::std::fmt::Display for Mortality
{
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result
{
write!(f, "{}", self.to_str())
}
}

82
src/priority.rs Normal file
View File

@ -0,0 +1,82 @@
/// A set of priority levels for tasks.
#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub enum Priority
{
/// The lowest priority level. This is for
/// tasks that you don't care when they finish.
Lowest,
/// A low priority task. This is for tasks that are not
/// as important as normal tasks.
Low,
/// A normal priority task. This is for most tasks to use.
Normal,
/// A high priority task. This is for tasks that need to make
/// sure they are scheduled to be done prior to most other tasks.
High,
/// The highest priority task. This is for tasks that
/// must be run right away.
Highest
}
impl Priority
{
/// Get a str representation of this variant.
pub fn to_str(&self) -> &'static str
{
match *self
{
Priority::Lowest =>
{
"Lowest"
}
Priority::Low =>
{
"Low"
}
Priority::Normal =>
{
"Normal"
}
Priority::High =>
{
"High"
}
Priority::Highest =>
{
"Highest"
}
}
}
/// Get a String representation of this variant.
pub fn to_string(&self) -> String
{
String::from(self.to_str())
}
}
impl ::std::fmt::Debug for Priority
{
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result
{
write!(f, "{}", self.to_str())
}
}
impl ::std::fmt::Display for Priority
{
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result
{
write!(f, "{}", self.to_str())
}
}

View File

@ -9,7 +9,7 @@ use std::vec::Vec;
use chrono::MonotonicTime;
use ::task::Task;
use ::taskable::Taskable;
use ::task_state_machine::TaskStateMachine;
use ::task_container::TaskContainer;
use ::blocking_thread::BlockingThread;
use ::compute_thread::ComputeThread;
@ -42,7 +42,7 @@ pub struct Scheduler
blocking_threads: Vec<ThreadData>,
/// The main task queue that threads can use to process tasks.
task_queue: Arc<Mutex<BinaryHeap<Box<Task>>>>
task_queue: Arc<Mutex<BinaryHeap<Box<TaskStateMachine>>>>
}
@ -105,7 +105,8 @@ impl Scheduler
}
/// Queues a task to be processed.
pub fn queue_task<T>(&mut self, task: T) where T: Taskable + 'static
pub fn queue_task<T>(&mut self, task: T)
where T: Task + 'static
{
let container: TaskContainer<T>;
@ -128,7 +129,8 @@ impl Scheduler
}
/// Queues a task, that is known to block, to be processed.
pub fn queue_blocking_task<T>(&mut self, task: T) where T: Taskable+ 'static
pub fn queue_blocking_task<T>(&mut self, task: T)
where T: Task + 'static
{
let data: ThreadData;
let container: TaskContainer<T>;
@ -255,7 +257,8 @@ impl Scheduler
// Calculate how many threads we would need to
// process all the tasks in our queue if each
// task was given its own thread.
println!("task_count: {}\navailable_threads: {}", task_count, available_threads);
println!("task_count: {}\navailable_threads: {}",
task_count, available_threads);
desired_threads = 0u64;
if task_count > available_threads
{
@ -283,7 +286,6 @@ impl Scheduler
///
fn trim_compute_threads(&mut self)
{
}
///
@ -330,7 +332,7 @@ impl Scheduler
ThreadData::new(join_handle, shutdown_channel.0, state_channel.1)
}
fn spawn_blocking_thread(&self, task: Box<Task>) -> ThreadData
fn spawn_blocking_thread(&self, task: Box<TaskStateMachine>) -> ThreadData
{
let shutdown_channel: (Sender<bool>, Receiver<bool>);
let state_channel: (Sender<ThreadState>, Receiver<ThreadState>);

92
src/spawner.rs Normal file
View File

@ -0,0 +1,92 @@
use std::vec::Drain;
use ::child_task::ChildTask;
use ::monitor::Monitor;
use ::task::Task;
use ::task_container::TaskContainer;
use ::task_state_machine::TaskStateMachine;
/// This structure contains the child and sibling Tasks
/// that a task has spawned while it was being processed.
pub struct Spawner
{
/// The child tasks that have been spawned.
///
/// Please use the functions so that they
/// can be properly wrapped.
child_tasks: Vec<Box<TaskStateMachine>>,
/// The sibling tasks that have been spawned.
///
/// Please use the functions so that they
/// can be properly wrapped.
sibling_tasks: Vec<Box<TaskStateMachine>>
}
impl Spawner
{
/// Creates a new Spawner structure.
pub fn new() -> Spawner
{
// Create the new Spawner.
Spawner
{
child_tasks: Vec::new(),
sibling_tasks: Vec::new()
}
}
/// Spawn a new child task.
pub fn spawn_child_task<T>(&mut self, child: T)
where T: Task + 'static
{
let child_task: ChildTask<T>;
let container: TaskContainer<ChildTask<T>>;
child_task = ChildTask::new(child);
container = TaskContainer::new(child_task);
self.child_tasks.push(Box::new(container));
}
/// Spawn a new sibling task.
pub fn spawn_sibling_task<T>(&mut self, sibling: T)
where T: Task + 'static
{
let container: TaskContainer<T>;
container = TaskContainer::new(sibling);
self.sibling_tasks.push(Box::new(container));
}
/// Returns true if child tasks were spawned; Otherwise, false.
pub fn has_child_tasks(&self) -> bool
{
!self.child_tasks.is_empty()
}
/// Returns true if sibling tasks were spawned; Otherwise, false.
pub fn has_sibling_tasks(&self) -> bool
{
!self.sibling_tasks.is_empty()
}
/// Get an Iterator over the spawned child tasks.
///
/// This will remove all the tasks from this spawner.
pub fn drain_child_tasks(&mut self) -> Drain<Box<TaskStateMachine>>
{
self.child_tasks.drain(..)
}
/// Get an Iterator over the spawned sibling tasks.
///
/// This will remove all the tasks from this spawner.
pub fn drain_sibling_tasks(&mut self) -> Drain<Box<TaskStateMachine>>
{
self.sibling_tasks.drain(..)
}
}

View File

@ -1,9 +0,0 @@
///
pub enum Status
{
///
Waiting,
///
Finished
}

View File

@ -1,15 +1,40 @@
use std::cmp::Ordering;
use ::task_state::TaskState;
use ::mortality::Mortality;
use ::priority::Priority;
use ::spawner::Spawner;
pub trait Task: Send + Sync
{
/// Get the name for this Task.
/// This does not need to be unique.
fn get_name(&self) -> &str;
fn get_state(&self) -> TaskState;
fn process(&mut self);
/// Get whether or not that this task
/// should be reset and rerun after
/// it has finished.
fn get_mortality(&self) -> Mortality;
/// Get the Priority level to run
/// this task at.
fn get_priority(&self) -> Priority;
/// Reset the task to its initial state.
fn reset(&mut self);
/// Process a single iteration of this Task.
/// This will continue to be called until
/// the task has been marked as finished.
///
/// This returns true, if the process is complete;
/// Otherwise, it returns false.
///
/// The given Spawner can be used to spawn child
/// or sibling tasks.
fn process(&mut self, spawner: &mut Spawner) -> bool;
}
@ -17,9 +42,21 @@ pub trait Task: Send + Sync
impl PartialEq for Task
{
fn eq(&self, other: &Self) -> bool
{
// Compare them first by priority, then by name.
match self.get_priority().cmp(&other.get_priority())
{
Ordering::Equal =>
{
self.get_name().to_string().eq(&other.get_name().to_string())
}
_ =>
{
false
}
}
}
}
impl Eq for Task
@ -37,7 +74,24 @@ impl PartialOrd for Task
impl Ord for Task
{
fn cmp(&self, other: &Self) -> Ordering
{
// Compare them first by priority, then by name.
match self.get_priority().cmp(&other.get_priority())
{
Ordering::Less =>
{
Ordering::Less
}
Ordering::Equal =>
{
self.get_name().to_string().cmp(&other.get_name().to_string())
}
Ordering::Greater =>
{
Ordering::Greater
}
}
}
}

View File

@ -1,6 +1,9 @@
use ::mortality::Mortality;
use ::priority::Priority;
use ::spawner::Spawner;
use ::task::Task;
use ::taskable::Taskable;
use ::task_state::TaskState;
use ::task_state_machine::TaskStateMachine;
@ -8,46 +11,138 @@ use ::task_state::TaskState;
pub struct TaskContainer<T>
{
///
pub state: TaskState,
state: TaskState,
///
pub task: T
task: T
}
impl<T> TaskContainer<T> where T: Taskable
impl<T> TaskContainer<T> where T: Task
{
///
pub fn new(task: T) -> TaskContainer<T>
pub fn new(decorated_task: T) -> TaskContainer<T>
{
TaskContainer
{
state: TaskState::Starting,
task: task
task: decorated_task
}
}
}
impl<T> Task for TaskContainer<T> where T: Taskable
impl<T> Task for TaskContainer<T> where T: Task
{
///
fn get_name(&self) -> &str
{
// Just return the contained Task's name.
self.task.get_name()
}
fn get_mortality(&self) -> Mortality
{
// Just return the contained Task's Mortality.
self.task.get_mortality()
}
fn get_priority(&self) -> Priority
{
// Just return the contained Task's Priority.
self.task.get_priority()
}
fn reset(&mut self)
{
debug!("{}: Reseting.", self.get_name());
// Set the state back to the starting state.
self.state = TaskState::Starting;
// Reset the contained Task.
self.task.reset();
}
fn process(&mut self, spawner: &mut Spawner) -> bool
{
match self.state
{
TaskState::Starting =>
{
debug!("{}: Switching from Starting to Processing.",
self.get_name());
// Move directly to the Processing state.
self.state = TaskState::Processing;
// Return that this task is not finished.
false
}
TaskState::Processing =>
{
debug!("{}: Processing task.", self.get_name());
// Process the contained Task.
if self.task.process(spawner) == true
{
// The task is finished processing.
self.state = TaskState::Finished;
}
else
{
// The task is still running, so check to see
// if any child tasks were spawned.
if spawner.has_child_tasks() == true
{
// There are child tasks, so change the state
// to be waiting while the children are processed.
self.state = TaskState::Waiting;
}
}
// Return that this task is not finished.
false
}
TaskState::Waiting =>
{
debug!("{}: Waiting for child Tasks to complete.",
self.get_name());
// Return that this task is not finished.
false
}
TaskState::Finished =>
{
debug!("{}: Task finished.", self.get_name());
// Return that this task is finished.
true
}
TaskState::Unknown =>
{
warn!("{}: Trying to process a task in an Unknown state.",
self.get_name());
// The task is in an Unknown state, return that
// it is not finished. The Unknown state
// should be tested for to make sure that this
// task is no longer processed.
false
}
}
}
}
impl<T> TaskStateMachine for TaskContainer<T> where T: Task
{
///
fn get_state(&self) -> TaskState
{
//
self.state
}
///
fn process(&mut self)
{
self.state = TaskState::Processing;
println!("Processing task.");
self.state = TaskState::Finished;
}
}

View File

@ -1,5 +1,5 @@
/// The different states a Task can go through during its lifetime.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub enum TaskState
{
/// The state that every Task starts in.
@ -21,13 +21,6 @@ pub enum TaskState
/// and UNKNOWN states.
Waiting,
/// The state a Task is in once it is done processing
/// child tasks, but prior to going back to the PROCESSING state.
///
/// The DONE_WAITING state can only lead to the PROCESSING
/// and UNKNOWN states.
DoneWaiting,
/// The state a Task will be in when it is FINISHED processing
/// and ready to be destroyed.
///
@ -59,11 +52,6 @@ impl TaskState
"Waiting"
}
TaskState::DoneWaiting =>
{
"DoneWaiting"
}
TaskState::Processing =>
{
"Processing"

89
src/task_state_machine.rs Normal file
View File

@ -0,0 +1,89 @@
use std::cmp::Ordering;
use ::task::Task;
use ::task_state::TaskState;
/// A trait
pub trait TaskStateMachine: Task
{
///
fn get_state(&self) -> TaskState;
}
impl PartialEq for TaskStateMachine
{
fn eq(&self, other: &Self) -> bool
{
// Compare them first by priority, then by name.
match self.get_priority().cmp(&other.get_priority())
{
Ordering::Equal =>
{
self.get_name().to_string().eq(&other.get_name().to_string())
}
_ =>
{
false
}
}
}
}
impl Eq for TaskStateMachine
{
}
impl PartialOrd for TaskStateMachine
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering>
{
Some(self.cmp(other))
}
}
impl Ord for TaskStateMachine
{
fn cmp(&self, other: &Self) -> Ordering
{
// Compare them first by priority, then by name.
match self.get_priority().cmp(&other.get_priority())
{
Ordering::Less =>
{
Ordering::Less
}
Ordering::Equal =>
{
match self.get_state().cmp(&other.get_state())
{
Ordering::Less =>
{
Ordering::Less
}
Ordering::Equal =>
{
self.get_name().to_string().cmp(
&other.get_name().to_string())
}
Ordering::Greater =>
{
Ordering::Greater
}
}
}
Ordering::Greater =>
{
Ordering::Greater
}
}
}
}

View File

@ -1,10 +0,0 @@
///
pub enum TaskType
{
/// A task ment to get its own thread so that
/// it can run as long as it needs to.
Immortal,
///
Mortal
}

View File

@ -1,9 +0,0 @@
///
pub trait Taskable: Send + Sync
{
///
fn get_name(&self) -> &str;
///
fn process(&mut self);
}

View File

@ -1,5 +1,5 @@
/// The different states a Thread can go through during its lifetime.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub enum ThreadState
{
/// The state that every Thread starts in.