diff --git a/src/blocking_thread.rs b/src/blocking_thread.rs index ba7cf3e..22f1ceb 100644 --- a/src/blocking_thread.rs +++ b/src/blocking_thread.rs @@ -1,8 +1,10 @@ use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::Duration; +use ::spawner::Spawner; use ::task::Task; use ::task_state::TaskState; +use ::task_state_machine::TaskStateMachine; use ::thread::Thread; use ::thread_state::ThreadState; @@ -15,7 +17,7 @@ pub struct BlockingThread state: ThreadState, /// - current_task: Box, + current_task: Box, /// shutdown_receiver: Receiver, @@ -24,7 +26,10 @@ pub struct BlockingThread state_sender: Sender, /// - continue_running: bool + continue_running: bool, + + /// + spawner: Spawner } @@ -32,7 +37,7 @@ pub struct BlockingThread impl BlockingThread { /// - pub fn new(task: Box, + pub fn new(task: Box, shutdown_r: Receiver, state_s: Sender) -> BlockingThread @@ -43,7 +48,8 @@ impl BlockingThread current_task: task, shutdown_receiver: shutdown_r, state_sender: state_s, - continue_running: true + continue_running: true, + spawner: Spawner::new() } } @@ -130,7 +136,7 @@ impl Thread for BlockingThread { // Process the task this thread is // currently working on. - self.current_task.process(); + self.current_task.process(&mut self.spawner); match self.current_task.get_state() { diff --git a/src/child_task.rs b/src/child_task.rs new file mode 100644 index 0000000..2c9d8a2 --- /dev/null +++ b/src/child_task.rs @@ -0,0 +1,62 @@ +use ::mortality::Mortality; +use ::priority::Priority; +use ::spawner::Spawner; +use ::task::Task; + + + +/// A Task created from another Task that is required +/// to be completed before the parent task can finish. +pub struct ChildTask +{ + /// + task: T +} + + + +impl ChildTask where T: Task +{ + /// Decorates a task with the required portions + /// needed for children of other Tasks. + pub fn new(decorated_task: T) -> ChildTask + { + ChildTask + { + task: decorated_task + } + } +} + +impl Task for ChildTask where T: Task +{ + fn get_name(&self) -> &str + { + // Just return the contained Task's name. + self.task.get_name() + } + + fn get_mortality(&self) -> Mortality + { + // Just return the contained Task's Mortality. + self.task.get_mortality() + } + + fn get_priority(&self) -> Priority + { + // Just return the contained Task's Priority. + self.task.get_priority() + } + + fn reset(&mut self) + { + // Reset the contained Task. + self.task.reset(); + } + + fn process(&mut self, spawner: &mut Spawner) -> bool + { + // Process the contained Task. + self.task.process(spawner) + } +} diff --git a/src/compute_thread.rs b/src/compute_thread.rs index 8871321..f14c339 100644 --- a/src/compute_thread.rs +++ b/src/compute_thread.rs @@ -4,8 +4,10 @@ use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::Duration; +use ::spawner::Spawner; use ::task::Task; use ::task_state::TaskState; +use ::task_state_machine::TaskStateMachine; use ::thread::Thread; use ::thread_state::ThreadState; @@ -18,10 +20,10 @@ pub struct ComputeThread state: ThreadState, /// - current_task: Option>, + current_task: Option>, /// - task_queue: Arc>>>, + task_queue: Arc>>>, /// shutdown_receiver: Receiver, @@ -31,6 +33,9 @@ pub struct ComputeThread /// continue_running: bool, + + /// + spawner: Spawner } @@ -38,7 +43,7 @@ pub struct ComputeThread impl ComputeThread { /// - pub fn new(queue: Arc>>>, + pub fn new(queue: Arc>>>, shutdown_r: Receiver, state_s: Sender) -> ComputeThread @@ -50,7 +55,8 @@ impl ComputeThread task_queue: queue, shutdown_receiver: shutdown_r, state_sender: state_s, - continue_running: true + continue_running: true, + spawner: Spawner::new() } } @@ -113,7 +119,7 @@ impl ComputeThread } /// Queues a task to be processed. - fn queue_task(&mut self, task: Box) + fn queue_task(&mut self, task: Box) { // Just add the task to the queue. match self.task_queue.lock() @@ -196,9 +202,10 @@ impl Thread for ComputeThread { Some(ref mut task) => { + println!("Task FOUND. Calling process."); // Process the task this thread is // currently working on. - task.process(); + task.process(&mut self.spawner); match task.get_state() { @@ -215,12 +222,14 @@ impl Thread for ComputeThread None => { + println!("Task NOT found. Looking for new task."); // Try to get a task to work on. self.retrieve_task(); match self.current_task { Some(_) => { + println!("Task RETRIEVED."); // We have a task to work on, so switch to // a Processing state. self.change_state(ThreadState::Processing); @@ -228,6 +237,7 @@ impl Thread for ComputeThread None => { + println!("Taskless. Idling thread."); // If we don't have a task to process, // then we may need to switch over to // an idle state. diff --git a/src/lib.rs b/src/lib.rs index 26b7b5a..c0d70c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,18 +5,33 @@ extern crate chrono; -mod scheduler; +mod mortality; +mod priority; + +pub mod monitor; +mod spawner; + +pub mod task_state; mod task; -mod taskable; -mod task_container; -mod task_state; -mod blocking_thread; -mod compute_thread; -mod thread; -mod thread_data; -mod thread_state; +pub mod child_task; +pub mod task_container; +pub mod task_state_machine; + +pub mod thread_data; +pub mod thread_state; +pub mod thread; +pub mod blocking_thread; +pub mod compute_thread; + +mod scheduler; +pub use self::mortality::Mortality; +pub use self::priority::Priority; + +pub use self::spawner::Spawner; + +pub use self::task::Task; + pub use self::scheduler::Scheduler; -pub use self::taskable::Taskable; diff --git a/src/monitor.rs b/src/monitor.rs new file mode 100644 index 0000000..f136567 --- /dev/null +++ b/src/monitor.rs @@ -0,0 +1,18 @@ +/// +pub struct Monitor +{ + g: i8 +} + + + +impl Monitor +{ + pub fn new() -> Monitor + { + Monitor + { + g: 0i8 + } + } +} diff --git a/src/mortality.rs b/src/mortality.rs new file mode 100644 index 0000000..951204b --- /dev/null +++ b/src/mortality.rs @@ -0,0 +1,56 @@ +/// Tasks can either be removed from the processing queue or added +/// back to the queue when they are finished processing. +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +pub enum Mortality +{ + /// A task that will be reset when finished so that + /// instead of dying, it is reborn. + Immortal, + + /// A task that will be done processing when it is finished. + Mortal +} + + + +impl Mortality +{ + /// Get a str representation of this variant. + pub fn to_str(&self) -> &'static str + { + match *self + { + Mortality::Immortal => + { + "Immortal" + } + + Mortality::Mortal => + { + "Mortal" + } + } + } + + /// Get a String representation of this variant. + pub fn to_string(&self) -> String + { + String::from(self.to_str()) + } +} + +impl ::std::fmt::Debug for Mortality +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} + +impl ::std::fmt::Display for Mortality +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} diff --git a/src/priority.rs b/src/priority.rs new file mode 100644 index 0000000..29baa3b --- /dev/null +++ b/src/priority.rs @@ -0,0 +1,82 @@ +/// A set of priority levels for tasks. +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] +pub enum Priority +{ + /// The lowest priority level. This is for + /// tasks that you don't care when they finish. + Lowest, + + /// A low priority task. This is for tasks that are not + /// as important as normal tasks. + Low, + + /// A normal priority task. This is for most tasks to use. + Normal, + + /// A high priority task. This is for tasks that need to make + /// sure they are scheduled to be done prior to most other tasks. + High, + + /// The highest priority task. This is for tasks that + /// must be run right away. + Highest +} + + + +impl Priority +{ + /// Get a str representation of this variant. + pub fn to_str(&self) -> &'static str + { + match *self + { + Priority::Lowest => + { + "Lowest" + } + + Priority::Low => + { + "Low" + } + + Priority::Normal => + { + "Normal" + } + + Priority::High => + { + "High" + } + + Priority::Highest => + { + "Highest" + } + } + } + + /// Get a String representation of this variant. + pub fn to_string(&self) -> String + { + String::from(self.to_str()) + } +} + +impl ::std::fmt::Debug for Priority +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} + +impl ::std::fmt::Display for Priority +{ + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result + { + write!(f, "{}", self.to_str()) + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs index 42fad0b..169a865 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -9,7 +9,7 @@ use std::vec::Vec; use chrono::MonotonicTime; use ::task::Task; -use ::taskable::Taskable; +use ::task_state_machine::TaskStateMachine; use ::task_container::TaskContainer; use ::blocking_thread::BlockingThread; use ::compute_thread::ComputeThread; @@ -42,7 +42,7 @@ pub struct Scheduler blocking_threads: Vec, /// The main task queue that threads can use to process tasks. - task_queue: Arc>>> + task_queue: Arc>>> } @@ -105,7 +105,8 @@ impl Scheduler } /// Queues a task to be processed. - pub fn queue_task(&mut self, task: T) where T: Taskable + 'static + pub fn queue_task(&mut self, task: T) + where T: Task + 'static { let container: TaskContainer; @@ -128,7 +129,8 @@ impl Scheduler } /// Queues a task, that is known to block, to be processed. - pub fn queue_blocking_task(&mut self, task: T) where T: Taskable+ 'static + pub fn queue_blocking_task(&mut self, task: T) + where T: Task + 'static { let data: ThreadData; let container: TaskContainer; @@ -255,7 +257,8 @@ impl Scheduler // Calculate how many threads we would need to // process all the tasks in our queue if each // task was given its own thread. - println!("task_count: {}\navailable_threads: {}", task_count, available_threads); + println!("task_count: {}\navailable_threads: {}", + task_count, available_threads); desired_threads = 0u64; if task_count > available_threads { @@ -283,7 +286,6 @@ impl Scheduler /// fn trim_compute_threads(&mut self) { - } /// @@ -330,7 +332,7 @@ impl Scheduler ThreadData::new(join_handle, shutdown_channel.0, state_channel.1) } - fn spawn_blocking_thread(&self, task: Box) -> ThreadData + fn spawn_blocking_thread(&self, task: Box) -> ThreadData { let shutdown_channel: (Sender, Receiver); let state_channel: (Sender, Receiver); diff --git a/src/spawner.rs b/src/spawner.rs new file mode 100644 index 0000000..4571df4 --- /dev/null +++ b/src/spawner.rs @@ -0,0 +1,92 @@ +use std::vec::Drain; + +use ::child_task::ChildTask; +use ::monitor::Monitor; +use ::task::Task; +use ::task_container::TaskContainer; +use ::task_state_machine::TaskStateMachine; + + + +/// This structure contains the child and sibling Tasks +/// that a task has spawned while it was being processed. +pub struct Spawner +{ + /// The child tasks that have been spawned. + /// + /// Please use the functions so that they + /// can be properly wrapped. + child_tasks: Vec>, + + /// The sibling tasks that have been spawned. + /// + /// Please use the functions so that they + /// can be properly wrapped. + sibling_tasks: Vec> +} + + + +impl Spawner +{ + /// Creates a new Spawner structure. + pub fn new() -> Spawner + { + // Create the new Spawner. + Spawner + { + child_tasks: Vec::new(), + sibling_tasks: Vec::new() + } + } + + /// Spawn a new child task. + pub fn spawn_child_task(&mut self, child: T) + where T: Task + 'static + { + let child_task: ChildTask; + let container: TaskContainer>; + + child_task = ChildTask::new(child); + container = TaskContainer::new(child_task); + self.child_tasks.push(Box::new(container)); + } + + /// Spawn a new sibling task. + pub fn spawn_sibling_task(&mut self, sibling: T) + where T: Task + 'static + { + let container: TaskContainer; + + container = TaskContainer::new(sibling); + self.sibling_tasks.push(Box::new(container)); + } + + /// Returns true if child tasks were spawned; Otherwise, false. + pub fn has_child_tasks(&self) -> bool + { + !self.child_tasks.is_empty() + } + + /// Returns true if sibling tasks were spawned; Otherwise, false. + pub fn has_sibling_tasks(&self) -> bool + { + !self.sibling_tasks.is_empty() + } + + /// Get an Iterator over the spawned child tasks. + /// + /// This will remove all the tasks from this spawner. + pub fn drain_child_tasks(&mut self) -> Drain> + { + self.child_tasks.drain(..) + } + + /// Get an Iterator over the spawned sibling tasks. + /// + /// This will remove all the tasks from this spawner. + pub fn drain_sibling_tasks(&mut self) -> Drain> + { + self.sibling_tasks.drain(..) + } +} diff --git a/src/status.rs b/src/status.rs deleted file mode 100644 index ee4e988..0000000 --- a/src/status.rs +++ /dev/null @@ -1,9 +0,0 @@ -/// -pub enum Status -{ - /// - Waiting, - - /// - Finished -} diff --git a/src/task.rs b/src/task.rs index 254b881..b6f74eb 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,15 +1,40 @@ use std::cmp::Ordering; -use ::task_state::TaskState; +use ::mortality::Mortality; +use ::priority::Priority; +use ::spawner::Spawner; pub trait Task: Send + Sync { + /// Get the name for this Task. + /// This does not need to be unique. fn get_name(&self) -> &str; - fn get_state(&self) -> TaskState; - fn process(&mut self); + /// Get whether or not that this task + /// should be reset and rerun after + /// it has finished. + fn get_mortality(&self) -> Mortality; + + /// Get the Priority level to run + /// this task at. + fn get_priority(&self) -> Priority; + + + /// Reset the task to its initial state. + fn reset(&mut self); + + /// Process a single iteration of this Task. + /// This will continue to be called until + /// the task has been marked as finished. + /// + /// This returns true, if the process is complete; + /// Otherwise, it returns false. + /// + /// The given Spawner can be used to spawn child + /// or sibling tasks. + fn process(&mut self, spawner: &mut Spawner) -> bool; } @@ -18,7 +43,19 @@ impl PartialEq for Task { fn eq(&self, other: &Self) -> bool { - self.get_name().to_string().eq(&other.get_name().to_string()) + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Equal => + { + self.get_name().to_string().eq(&other.get_name().to_string()) + } + + _ => + { + false + } + } } } @@ -38,6 +75,23 @@ impl Ord for Task { fn cmp(&self, other: &Self) -> Ordering { - self.get_name().to_string().cmp(&other.get_name().to_string()) + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Less => + { + Ordering::Less + } + + Ordering::Equal => + { + self.get_name().to_string().cmp(&other.get_name().to_string()) + } + + Ordering::Greater => + { + Ordering::Greater + } + } } } diff --git a/src/task_container.rs b/src/task_container.rs index 97ee6ac..62e0d8d 100644 --- a/src/task_container.rs +++ b/src/task_container.rs @@ -1,6 +1,9 @@ +use ::mortality::Mortality; +use ::priority::Priority; +use ::spawner::Spawner; use ::task::Task; -use ::taskable::Taskable; use ::task_state::TaskState; +use ::task_state_machine::TaskStateMachine; @@ -8,46 +11,138 @@ use ::task_state::TaskState; pub struct TaskContainer { /// - pub state: TaskState, + state: TaskState, /// - pub task: T + task: T } -impl TaskContainer where T: Taskable +impl TaskContainer where T: Task { /// - pub fn new(task: T) -> TaskContainer + pub fn new(decorated_task: T) -> TaskContainer { TaskContainer { state: TaskState::Starting, - task: task + task: decorated_task } } } -impl Task for TaskContainer where T: Taskable +impl Task for TaskContainer where T: Task { - /// fn get_name(&self) -> &str { + // Just return the contained Task's name. self.task.get_name() } + fn get_mortality(&self) -> Mortality + { + // Just return the contained Task's Mortality. + self.task.get_mortality() + } + + fn get_priority(&self) -> Priority + { + // Just return the contained Task's Priority. + self.task.get_priority() + } + + fn reset(&mut self) + { + debug!("{}: Reseting.", self.get_name()); + + // Set the state back to the starting state. + self.state = TaskState::Starting; + + // Reset the contained Task. + self.task.reset(); + } + + fn process(&mut self, spawner: &mut Spawner) -> bool + { + match self.state + { + TaskState::Starting => + { + debug!("{}: Switching from Starting to Processing.", + self.get_name()); + + // Move directly to the Processing state. + self.state = TaskState::Processing; + + // Return that this task is not finished. + false + } + + TaskState::Processing => + { + debug!("{}: Processing task.", self.get_name()); + + // Process the contained Task. + if self.task.process(spawner) == true + { + // The task is finished processing. + self.state = TaskState::Finished; + } + else + { + // The task is still running, so check to see + // if any child tasks were spawned. + if spawner.has_child_tasks() == true + { + // There are child tasks, so change the state + // to be waiting while the children are processed. + self.state = TaskState::Waiting; + } + } + + // Return that this task is not finished. + false + } + + TaskState::Waiting => + { + debug!("{}: Waiting for child Tasks to complete.", + self.get_name()); + + // Return that this task is not finished. + false + } + + TaskState::Finished => + { + debug!("{}: Task finished.", self.get_name()); + + // Return that this task is finished. + true + } + + TaskState::Unknown => + { + warn!("{}: Trying to process a task in an Unknown state.", + self.get_name()); + + // The task is in an Unknown state, return that + // it is not finished. The Unknown state + // should be tested for to make sure that this + // task is no longer processed. + false + } + } + } +} + +impl TaskStateMachine for TaskContainer where T: Task +{ /// fn get_state(&self) -> TaskState { + // self.state } - - /// - fn process(&mut self) - { - self.state = TaskState::Processing; - println!("Processing task."); - self.state = TaskState::Finished; - } } diff --git a/src/task_state.rs b/src/task_state.rs index 9621b8e..0f517b9 100644 --- a/src/task_state.rs +++ b/src/task_state.rs @@ -1,5 +1,5 @@ /// The different states a Task can go through during its lifetime. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] pub enum TaskState { /// The state that every Task starts in. @@ -21,13 +21,6 @@ pub enum TaskState /// and UNKNOWN states. Waiting, - /// The state a Task is in once it is done processing - /// child tasks, but prior to going back to the PROCESSING state. - /// - /// The DONE_WAITING state can only lead to the PROCESSING - /// and UNKNOWN states. - DoneWaiting, - /// The state a Task will be in when it is FINISHED processing /// and ready to be destroyed. /// @@ -59,11 +52,6 @@ impl TaskState "Waiting" } - TaskState::DoneWaiting => - { - "DoneWaiting" - } - TaskState::Processing => { "Processing" diff --git a/src/task_state_machine.rs b/src/task_state_machine.rs new file mode 100644 index 0000000..05bd200 --- /dev/null +++ b/src/task_state_machine.rs @@ -0,0 +1,89 @@ +use std::cmp::Ordering; + +use ::task::Task; +use ::task_state::TaskState; + + + +/// A trait +pub trait TaskStateMachine: Task +{ + /// + fn get_state(&self) -> TaskState; +} + + + +impl PartialEq for TaskStateMachine +{ + fn eq(&self, other: &Self) -> bool + { + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Equal => + { + self.get_name().to_string().eq(&other.get_name().to_string()) + } + + _ => + { + false + } + } + } +} + +impl Eq for TaskStateMachine +{ +} + +impl PartialOrd for TaskStateMachine +{ + fn partial_cmp(&self, other: &Self) -> Option + { + Some(self.cmp(other)) + } +} + +impl Ord for TaskStateMachine +{ + fn cmp(&self, other: &Self) -> Ordering + { + // Compare them first by priority, then by name. + match self.get_priority().cmp(&other.get_priority()) + { + Ordering::Less => + { + Ordering::Less + } + + Ordering::Equal => + { + match self.get_state().cmp(&other.get_state()) + { + Ordering::Less => + { + Ordering::Less + } + + Ordering::Equal => + { + self.get_name().to_string().cmp( + &other.get_name().to_string()) + } + + Ordering::Greater => + { + Ordering::Greater + } + } + } + + Ordering::Greater => + { + Ordering::Greater + } + } + } +} diff --git a/src/task_type.rs b/src/task_type.rs deleted file mode 100644 index dd9d4e6..0000000 --- a/src/task_type.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// -pub enum TaskType -{ - /// A task ment to get its own thread so that - /// it can run as long as it needs to. - Immortal, - - /// - Mortal -} diff --git a/src/taskable.rs b/src/taskable.rs deleted file mode 100644 index 5ba83b9..0000000 --- a/src/taskable.rs +++ /dev/null @@ -1,9 +0,0 @@ -/// -pub trait Taskable: Send + Sync -{ - /// - fn get_name(&self) -> &str; - - /// - fn process(&mut self); -} diff --git a/src/thread_state.rs b/src/thread_state.rs index a520522..99797be 100644 --- a/src/thread_state.rs +++ b/src/thread_state.rs @@ -1,5 +1,5 @@ /// The different states a Thread can go through during its lifetime. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] pub enum ThreadState { /// The state that every Thread starts in.