From bc4ed6d0bcd2fb77feec61dc6b010ca53bfdfeb6 Mon Sep 17 00:00:00 2001 From: Jason Travis Smith Date: Sat, 25 Jun 2016 05:26:10 -0400 Subject: [PATCH] Most of the new task processing is now complete. The task states and how to spawn new tasks has all been sorted out. All that is left at this point is to handle the thread portion of task management and to determine how to get messages from the new child task's completion back to the parent task. --- src/blocking_thread.rs | 16 +++-- src/child_task.rs | 62 +++++++++++++++++++ src/compute_thread.rs | 22 +++++-- src/lib.rs | 35 ++++++++--- src/monitor.rs | 18 ++++++ src/mortality.rs | 56 +++++++++++++++++ src/priority.rs | 82 ++++++++++++++++++++++++ src/scheduler.rs | 16 ++--- src/spawner.rs | 92 +++++++++++++++++++++++++++ src/status.rs | 9 --- src/task.rs | 64 +++++++++++++++++-- src/task_container.rs | 127 +++++++++++++++++++++++++++++++++----- src/task_state.rs | 14 +---- src/task_state_machine.rs | 89 ++++++++++++++++++++++++++ src/task_type.rs | 10 --- src/taskable.rs | 9 --- src/thread_state.rs | 2 +- 17 files changed, 632 insertions(+), 91 deletions(-) create mode 100644 src/child_task.rs create mode 100644 src/monitor.rs create mode 100644 src/mortality.rs create mode 100644 src/priority.rs create mode 100644 src/spawner.rs delete mode 100644 src/status.rs create mode 100644 src/task_state_machine.rs delete mode 100644 src/task_type.rs delete mode 100644 src/taskable.rs diff --git a/src/blocking_thread.rs b/src/blocking_thread.rs index ba7cf3e..22f1ceb 100644 --- a/src/blocking_thread.rs +++ b/src/blocking_thread.rs @@ -1,8 +1,10 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::Duration; +use ::spawner::Spawner; use ::task::Task; use ::task_state::TaskState; +use ::task_state_machine::TaskStateMachine; use ::thread::Thread; use ::thread_state::ThreadState; @@ -15,7 +17,7 @@ pub struct BlockingThread state: ThreadState, /// - current_task: Box, + current_task: Box, /// shutdown_receiver: Receiver, @@ -24,7 +26,10 @@ pub struct BlockingThread state_sender: Sender, /// - continue_running: bool + continue_running: bool, + + /// + spawner: Spawner } @@ -32,7 +37,7 @@ pub struct BlockingThread impl BlockingThread { /// - pub fn new(task: Box, + pub fn new(task: Box, shutdown_r: Receiver, state_s: Sender) -> BlockingThread @@ -43,7 +48,8 @@ impl BlockingThread current_task: task, shutdown_receiver: shutdown_r, state_sender: state_s, - continue_running: true + continue_running: true, + spawner: Spawner::new() } } @@ -130,7 +136,7 @@ impl Thread for BlockingThread { // Process the task this thread is // currently working on. - self.current_task.process(); + self.current_task.process(&mut self.spawner); match self.current_task.get_state() { diff --git a/src/child_task.rs b/src/child_task.rs new file mode 100644 index 0000000..2c9d8a2 --- /dev/null +++ b/src/child_task.rs @@ -0,0 +1,62 @@ +use ::mortality::Mortality; +use ::priority::Priority; +use ::spawner::Spawner; +use ::task::Task; + + + +/// A Task created from another Task that is required +/// to be completed before the parent task can finish. +pub struct ChildTask +{ + /// + task: T +} + + + +impl ChildTask where T: Task +{ + /// Decorates a task with the required portions + /// needed for children of other Tasks. + pub fn new(decorated_task: T) -> ChildTask + { + ChildTask + { + task: decorated_task + } + } +} + +impl Task for ChildTask where T: Task +{ + fn get_name(&self) -> &str + { + // Just return the contained Task's name. + self.task.get_name() + } + + fn get_mortality(&self) -> Mortality + { + // Just return the contained Task's Mortality. + self.task.get_mortality() + } + + fn get_priority(&self) -> Priority + { + // Just return the contained Task's Priority. + self.task.get_priority() + } + + fn reset(&mut self) + { + // Reset the contained Task. + self.task.reset(); + } + + fn process(&mut self, spawner: &mut Spawner) -> bool + { + // Process the contained Task. + self.task.process(spawner) + } +} diff --git a/src/compute_thread.rs b/src/compute_thread.rs index 8871321..f14c339 100644 --- a/src/compute_thread.rs +++ b/src/compute_thread.rs @@ -4,8 +4,10 @@ use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::Duration; +use ::spawner::Spawner; use ::task::Task; use ::task_state::TaskState; +use ::task_state_machine::TaskStateMachine; use ::thread::Thread; use ::thread_state::ThreadState; @@ -18,10 +20,10 @@ pub struct ComputeThread state: ThreadState, /// - current_task: Option>, + current_task: Option>, /// - task_queue: Arc>>>, + task_queue: Arc>>>, /// shutdown_receiver: Receiver, @@ -31,6 +33,9 @@ pub struct ComputeThread /// continue_running: bool, + + /// + spawner: Spawner } @@ -38,7 +43,7 @@ pub struct ComputeThread impl ComputeThread { /// - pub fn new(queue: Arc>>>, + pub fn new(queue: Arc>>>, shutdown_r: Receiver, state_s: Sender) -> ComputeThread @@ -50,7 +55,8 @@ impl ComputeThread task_queue: queue, shutdown_receiver: shutdown_r, state_sender: state_s, - continue_running: true + continue_running: true, + spawner: Spawner::new() } } @@ -113,7 +119,7 @@ impl ComputeThread } /// Queues a task to be processed. - fn queue_task(&mut self, task: Box) + fn queue_task(&mut self, task: Box) { // Just add the task to the queue. match self.task_queue.lock() @@ -196,9 +202,10 @@ impl Thread for ComputeThread { Some(ref mut task) => { + println!("Task FOUND. Calling process."); // Process the task this thread is // currently working on. - task.process(); + task.process(&mut self.spawner); match task.get_state() { @@ -215,12 +222,14 @@ impl Thread for ComputeThread None => { + println!("Task NOT found. Looking for new task."); // Try to get a task to work on. self.retrieve_task(); match self.current_task { Some(_) => { + println!("Task RETRIEVED."); // We have a task to work on, so switch to // a Processing state. self.change_state(ThreadState::Processing); @@ -228,6 +237,7 @@ impl Thread for ComputeThread None => { + println!("Taskless. Idling thread."); // If we don't have a task to process, // then we may need to switch over to // an idle state. diff --git a/src/lib.rs b/src/lib.rs index 26b7b5a..c0d70c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,18 +5,33 @@ extern crate chrono; -mod scheduler; +mod mortality; +mod priority; + +pub mod monitor; +mod spawner; + +pub mod task_state; mod task; -mod taskable; -mod task_container; -mod task_state; -mod blocking_thread; -mod compute_thread; -mod thread; -mod thread_data; -mod thread_state; +pub mod child_task; +pub mod task_container; +pub mod task_state_machine; + +pub mod thread_data; +pub mod thread_state; +pub mod thread; +pub mod blocking_thread; +pub mod compute_thread; + +mod scheduler; +pub use self::mortality::Mortality; +pub use self::priority::Priority; + +pub use self::spawner::Spawner; + +pub use self::task::Task; + pub use self::scheduler::Scheduler; -pub use self::taskable::Taskable; diff --git a/src/monitor.rs b/src/monitor.rs new file mode 100644 index 0000000..f136567 --- /dev/null +++ b/src/monitor.rs @@ -0,0 +1,18 @@ +/// +pub struct Monitor +{ + g: i8 +} + + + +impl Monitor +{ + pub fn new() -> Monitor + { + Monitor + { + g: 0i8 + } + } +} diff --git a/src/mortality.rs b/src/mortality.rs new file mode 100644 index 0000000..951204b --- /dev/null +++ b/src/mortality.rs @@ -0,0 +1,56 @@ +/// Tasks can either be removed from the processing queue or added +/// back to the queue when they are finished processing. +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +pub enum Mortality +{ + /// A task that will be reset when finished so that + /// instead of dying, it is reborn. + Immortal, + + /// A task that will be done processing when it is finished. + Mortal +} + + + +impl Mortality +{ + /// Get a str representation of this variant. + pub fn to_str(&self) -> &'static str + { + match *self + { + Mortality::Immortal => + { + "Immortal" + } + + Mortality::Mortal => + { + "Mortal" + } + } + } + + /// Get a String representation of this variant. + pub fn to_string(&self) -> String + { + String::from(self.to_str()) + } +} + +impl ::std::fmt::Debug for Mortality +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} + +impl ::std::fmt::Display for Mortality +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} diff --git a/src/priority.rs b/src/priority.rs new file mode 100644 index 0000000..29baa3b --- /dev/null +++ b/src/priority.rs @@ -0,0 +1,82 @@ +/// A set of priority levels for tasks. +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +pub enum Priority +{ + /// The lowest priority level. This is for + /// tasks that you don't care when they finish. + Lowest, + + /// A low priority task. This is for tasks that are not + /// as important as normal tasks. + Low, + + /// A normal priority task. This is for most tasks to use. + Normal, + + /// A high priority task. This is for tasks that need to make + /// sure they are scheduled to be done prior to most other tasks. + High, + + /// The highest priority task. This is for tasks that + /// must be run right away. + Highest +} + + + +impl Priority +{ + /// Get a str representation of this variant. + pub fn to_str(&self) -> &'static str + { + match *self + { + Priority::Lowest => + { + "Lowest" + } + + Priority::Low => + { + "Low" + } + + Priority::Normal => + { + "Normal" + } + + Priority::High => + { + "High" + } + + Priority::Highest => + { + "Highest" + } + } + } + + /// Get a String representation of this variant. + pub fn to_string(&self) -> String + { + String::from(self.to_str()) + } +} + +impl ::std::fmt::Debug for Priority +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} + +impl ::std::fmt::Display for Priority +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs index 42fad0b..169a865 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -9,7 +9,7 @@ use std::vec::Vec; use chrono::MonotonicTime; use ::task::Task; -use ::taskable::Taskable; +use ::task_state_machine::TaskStateMachine; use ::task_container::TaskContainer; use ::blocking_thread::BlockingThread; use ::compute_thread::ComputeThread; @@ -42,7 +42,7 @@ pub struct Scheduler blocking_threads: Vec, /// The main task queue that threads can use to process tasks. - task_queue: Arc>>> + task_queue: Arc>>> } @@ -105,7 +105,8 @@ impl Scheduler } /// Queues a task to be processed. - pub fn queue_task(&mut self, task: T) where T: Taskable + 'static + pub fn queue_task(&mut self, task: T) + where T: Task + 'static { let container: TaskContainer; @@ -128,7 +129,8 @@ impl Scheduler } /// Queues a task, that is known to block, to be processed. - pub fn queue_blocking_task(&mut self, task: T) where T: Taskable+ 'static + pub fn queue_blocking_task(&mut self, task: T) + where T: Task + 'static { let data: ThreadData; let container: TaskContainer; @@ -255,7 +257,8 @@ impl Scheduler // Calculate how many threads we would need to // process all the tasks in our queue if each // task was given its own thread. - println!("task_count: {}\navailable_threads: {}", task_count, available_threads); + println!("task_count: {}\navailable_threads: {}", + task_count, available_threads); desired_threads = 0u64; if task_count > available_threads { @@ -283,7 +286,6 @@ impl Scheduler /// fn trim_compute_threads(&mut self) { - } /// @@ -330,7 +332,7 @@ impl Scheduler ThreadData::new(join_handle, shutdown_channel.0, state_channel.1) } - fn spawn_blocking_thread(&self, task: Box) -> ThreadData + fn spawn_blocking_thread(&self, task: Box) -> ThreadData { let shutdown_channel: (Sender, Receiver); let state_channel: (Sender, Receiver); diff --git a/src/spawner.rs b/src/spawner.rs new file mode 100644 index 0000000..4571df4 --- /dev/null +++ b/src/spawner.rs @@ -0,0 +1,92 @@ +use std::vec::Drain; + +use ::child_task::ChildTask; +use ::monitor::Monitor; +use ::task::Task; +use ::task_container::TaskContainer; +use ::task_state_machine::TaskStateMachine; + + + +/// This structure contains the child and sibling Tasks +/// that a task has spawned while it was being processed. +pub struct Spawner +{ + /// The child tasks that have been spawned. + /// + /// Please use the functions so that they + /// can be properly wrapped. + child_tasks: Vec>, + + /// The sibling tasks that have been spawned. + /// + /// Please use the functions so that they + /// can be properly wrapped. + sibling_tasks: Vec> +} + + + +impl Spawner +{ + /// Creates a new Spawner structure. + pub fn new() -> Spawner + { + // Create the new Spawner. + Spawner + { + child_tasks: Vec::new(), + sibling_tasks: Vec::new() + } + } + + /// Spawn a new child task. + pub fn spawn_child_task(&mut self, child: T) + where T: Task + 'static + { + let child_task: ChildTask; + let container: TaskContainer>; + + child_task = ChildTask::new(child); + container = TaskContainer::new(child_task); + self.child_tasks.push(Box::new(container)); + } + + /// Spawn a new sibling task. + pub fn spawn_sibling_task(&mut self, sibling: T) + where T: Task + 'static + { + let container: TaskContainer; + + container = TaskContainer::new(sibling); + self.sibling_tasks.push(Box::new(container)); + } + + /// Returns true if child tasks were spawned; Otherwise, false. + pub fn has_child_tasks(&self) -> bool + { + !self.child_tasks.is_empty() + } + + /// Returns true if sibling tasks were spawned; Otherwise, false. + pub fn has_sibling_tasks(&self) -> bool + { + !self.sibling_tasks.is_empty() + } + + /// Get an Iterator over the spawned child tasks. + /// + /// This will remove all the tasks from this spawner. + pub fn drain_child_tasks(&mut self) -> Drain> + { + self.child_tasks.drain(..) + } + + /// Get an Iterator over the spawned sibling tasks. + /// + /// This will remove all the tasks from this spawner. + pub fn drain_sibling_tasks(&mut self) -> Drain> + { + self.sibling_tasks.drain(..) + } +} diff --git a/src/status.rs b/src/status.rs deleted file mode 100644 index ee4e988..0000000 --- a/src/status.rs +++ /dev/null @@ -1,9 +0,0 @@ -/// -pub enum Status -{ - /// - Waiting, - - /// - Finished -} diff --git a/src/task.rs b/src/task.rs index 254b881..b6f74eb 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,15 +1,40 @@ use std::cmp::Ordering; -use ::task_state::TaskState; +use ::mortality::Mortality; +use ::priority::Priority; +use ::spawner::Spawner; pub trait Task: Send + Sync { + /// Get the name for this Task. + /// This does not need to be unique. fn get_name(&self) -> &str; - fn get_state(&self) -> TaskState; - fn process(&mut self); + /// Get whether or not that this task + /// should be reset and rerun after + /// it has finished. + fn get_mortality(&self) -> Mortality; + + /// Get the Priority level to run + /// this task at. + fn get_priority(&self) -> Priority; + + + /// Reset the task to its initial state. + fn reset(&mut self); + + /// Process a single iteration of this Task. + /// This will continue to be called until + /// the task has been marked as finished. + /// + /// This returns true, if the process is complete; + /// Otherwise, it returns false. + /// + /// The given Spawner can be used to spawn child + /// or sibling tasks. + fn process(&mut self, spawner: &mut Spawner) -> bool; } @@ -18,7 +43,19 @@ impl PartialEq for Task { fn eq(&self, other: &Self) -> bool { - self.get_name().to_string().eq(&other.get_name().to_string()) + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Equal => + { + self.get_name().to_string().eq(&other.get_name().to_string()) + } + + _ => + { + false + } + } } } @@ -38,6 +75,23 @@ impl Ord for Task { fn cmp(&self, other: &Self) -> Ordering { - self.get_name().to_string().cmp(&other.get_name().to_string()) + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Less => + { + Ordering::Less + } + + Ordering::Equal => + { + self.get_name().to_string().cmp(&other.get_name().to_string()) + } + + Ordering::Greater => + { + Ordering::Greater + } + } } } diff --git a/src/task_container.rs b/src/task_container.rs index 97ee6ac..62e0d8d 100644 --- a/src/task_container.rs +++ b/src/task_container.rs @@ -1,6 +1,9 @@ +use ::mortality::Mortality; +use ::priority::Priority; +use ::spawner::Spawner; use ::task::Task; -use ::taskable::Taskable; use ::task_state::TaskState; +use ::task_state_machine::TaskStateMachine; @@ -8,46 +11,138 @@ use ::task_state::TaskState; pub struct TaskContainer { /// - pub state: TaskState, + state: TaskState, /// - pub task: T + task: T } -impl TaskContainer where T: Taskable +impl TaskContainer where T: Task { /// - pub fn new(task: T) -> TaskContainer + pub fn new(decorated_task: T) -> TaskContainer { TaskContainer { state: TaskState::Starting, - task: task + task: decorated_task } } } -impl Task for TaskContainer where T: Taskable +impl Task for TaskContainer where T: Task { - /// fn get_name(&self) -> &str { + // Just return the contained Task's name. self.task.get_name() } + fn get_mortality(&self) -> Mortality + { + // Just return the contained Task's Mortality. + self.task.get_mortality() + } + + fn get_priority(&self) -> Priority + { + // Just return the contained Task's Priority. + self.task.get_priority() + } + + fn reset(&mut self) + { + debug!("{}: Reseting.", self.get_name()); + + // Set the state back to the starting state. + self.state = TaskState::Starting; + + // Reset the contained Task. + self.task.reset(); + } + + fn process(&mut self, spawner: &mut Spawner) -> bool + { + match self.state + { + TaskState::Starting => + { + debug!("{}: Switching from Starting to Processing.", + self.get_name()); + + // Move directly to the Processing state. + self.state = TaskState::Processing; + + // Return that this task is not finished. + false + } + + TaskState::Processing => + { + debug!("{}: Processing task.", self.get_name()); + + // Process the contained Task. + if self.task.process(spawner) == true + { + // The task is finished processing. + self.state = TaskState::Finished; + } + else + { + // The task is still running, so check to see + // if any child tasks were spawned. + if spawner.has_child_tasks() == true + { + // There are child tasks, so change the state + // to be waiting while the children are processed. + self.state = TaskState::Waiting; + } + } + + // Return that this task is not finished. + false + } + + TaskState::Waiting => + { + debug!("{}: Waiting for child Tasks to complete.", + self.get_name()); + + // Return that this task is not finished. + false + } + + TaskState::Finished => + { + debug!("{}: Task finished.", self.get_name()); + + // Return that this task is finished. + true + } + + TaskState::Unknown => + { + warn!("{}: Trying to process a task in an Unknown state.", + self.get_name()); + + // The task is in an Unknown state, return that + // it is not finished. The Unknown state + // should be tested for to make sure that this + // task is no longer processed. + false + } + } + } +} + +impl TaskStateMachine for TaskContainer where T: Task +{ /// fn get_state(&self) -> TaskState { + // self.state } - - /// - fn process(&mut self) - { - self.state = TaskState::Processing; - println!("Processing task."); - self.state = TaskState::Finished; - } } diff --git a/src/task_state.rs b/src/task_state.rs index 9621b8e..0f517b9 100644 --- a/src/task_state.rs +++ b/src/task_state.rs @@ -1,5 +1,5 @@ /// The different states a Task can go through during its lifetime. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] pub enum TaskState { /// The state that every Task starts in. @@ -21,13 +21,6 @@ pub enum TaskState /// and UNKNOWN states. Waiting, - /// The state a Task is in once it is done processing - /// child tasks, but prior to going back to the PROCESSING state. - /// - /// The DONE_WAITING state can only lead to the PROCESSING - /// and UNKNOWN states. - DoneWaiting, - /// The state a Task will be in when it is FINISHED processing /// and ready to be destroyed. /// @@ -59,11 +52,6 @@ impl TaskState "Waiting" } - TaskState::DoneWaiting => - { - "DoneWaiting" - } - TaskState::Processing => { "Processing" diff --git a/src/task_state_machine.rs b/src/task_state_machine.rs new file mode 100644 index 0000000..05bd200 --- /dev/null +++ b/src/task_state_machine.rs @@ -0,0 +1,89 @@ +use std::cmp::Ordering; + +use ::task::Task; +use ::task_state::TaskState; + + + +/// A trait +pub trait TaskStateMachine: Task +{ + /// + fn get_state(&self) -> TaskState; +} + + + +impl PartialEq for TaskStateMachine +{ + fn eq(&self, other: &Self) -> bool + { + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Equal => + { + self.get_name().to_string().eq(&other.get_name().to_string()) + } + + _ => + { + false + } + } + } +} + +impl Eq for TaskStateMachine +{ +} + +impl PartialOrd for TaskStateMachine +{ + fn partial_cmp(&self, other: &Self) -> Option + { + Some(self.cmp(other)) + } +} + +impl Ord for TaskStateMachine +{ + fn cmp(&self, other: &Self) -> Ordering + { + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Less => + { + Ordering::Less + } + + Ordering::Equal => + { + match self.get_state().cmp(&other.get_state()) + { + Ordering::Less => + { + Ordering::Less + } + + Ordering::Equal => + { + self.get_name().to_string().cmp( + &other.get_name().to_string()) + } + + Ordering::Greater => + { + Ordering::Greater + } + } + } + + Ordering::Greater => + { + Ordering::Greater + } + } + } +} diff --git a/src/task_type.rs b/src/task_type.rs deleted file mode 100644 index dd9d4e6..0000000 --- a/src/task_type.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// -pub enum TaskType -{ - /// A task ment to get its own thread so that - /// it can run as long as it needs to. - Immortal, - - /// - Mortal -} diff --git a/src/taskable.rs b/src/taskable.rs deleted file mode 100644 index 5ba83b9..0000000 --- a/src/taskable.rs +++ /dev/null @@ -1,9 +0,0 @@ -/// -pub trait Taskable: Send + Sync -{ - /// - fn get_name(&self) -> &str; - - /// - fn process(&mut self); -} diff --git a/src/thread_state.rs b/src/thread_state.rs index a520522..99797be 100644 --- a/src/thread_state.rs +++ b/src/thread_state.rs @@ -1,5 +1,5 @@ /// The different states a Thread can go through during its lifetime. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] pub enum ThreadState { /// The state that every Thread starts in.