diff --git a/Cargo.lock b/Cargo.lock index 3361a4c..d1e502e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,11 +2,23 @@ name = "apprentice" version = "0.1.0" dependencies = [ + "chrono 0.9.0 (git+https://gitlab.com/CyberMages/chrono.git)", "scribe 0.1.0 (git+https://gitlab.com/CyberMages/scribe.git)", + "spellbook 0.1.0 (git+https://gitlab.com/CyberMages/spellbook.git)", ] +[[package]] +name = "chrono" +version = "0.9.0" +source = "git+https://gitlab.com/CyberMages/chrono.git#10f1633ac1334cb6ada86222c68efeea89141be7" + [[package]] name = "scribe" version = "0.1.0" -source = "git+https://gitlab.com/CyberMages/scribe.git#d6a6f5107c8d03b13e081c9378486781e31daa4e" +source = "git+https://gitlab.com/CyberMages/scribe.git#e52418d3bfc28cd1f03cc7f31af06fce2e03f844" + +[[package]] +name = "spellbook" +version = "0.1.0" +source = "git+https://gitlab.com/CyberMages/spellbook.git#f8526d248f3b7eb4f15f672368ac45a64c4eacbd" diff --git a/Cargo.toml b/Cargo.toml index a3cc451..3ad5776 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,11 @@ documentation = "" keywords = ["scheduler", "task", "thread"] +[dependencies.chrono] +git = "https://gitlab.com/CyberMages/chrono.git" + [dependencies.scribe] git = "https://gitlab.com/CyberMages/scribe.git" + +[dependencies.spellbook] +git = "https://gitlab.com/CyberMages/spellbook.git" diff --git a/README.md b/README.md new file mode 100644 index 0000000..8a51d8b --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Apprentice # diff --git a/examples/sample_tasks.rs b/examples/sample_tasks.rs index aff243a..e0cd723 100644 --- a/examples/sample_tasks.rs +++ b/examples/sample_tasks.rs @@ -8,5 +8,11 @@ use apprentice::*; pub fn main() { - println!("Hello world"); + let mut scheduler: Scheduler; + + scheduler = Scheduler::new(None, Some(4)); + scheduler.queue_task(Task::new("Test")); + scheduler.process_tasks(true); + scheduler.process_tasks(true); + //scheduler.process_tasks(true); } diff --git a/src/lib.rs b/src/lib.rs index 00d028c..9de434e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,22 @@ #[macro_use] extern crate scribe; +#[macro_use] +extern crate spellbook; + +extern crate chrono; + mod scheduler; mod task; +mod task_state; mod thread; +mod thread_data; +mod thread_state; pub use self::scheduler::Scheduler; pub use self::task::Task; -pub use self::thread::Thread; +pub use self::task_state::TaskState; diff --git a/src/scheduler.rs b/src/scheduler.rs index ae15a01..c3c0798 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,60 +1,514 @@ +use std::collections::binary_heap::BinaryHeap; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::thread::JoinHandle; +use std::time::Duration; +use std::vec::Vec; + +use ::task::Task; +use ::task_state::TaskState; use ::thread::Thread; +use ::thread_data::ThreadData; +use ::thread_state::ThreadState; + +use chrono::MonotonicTime; /// pub struct Scheduler { - /// - has_unlimited_threads: bool, + /// The minimum amount of compute threads that the + /// scheduler will keep alive. + minimum_thread_amount: u64, - /// + /// The maximum amount of compute threads that the + /// scheduler will have alive at one given time. maximum_thread_amount: u64, - /// - thread_list: Vec + /// Specify the maximum amount of time that a thread + /// can stay idle before it is marked to be killed. + maximum_thread_idle_time: Duration, + + /// The threads created to handle + /// non-blocking computation tasks. + compute_threads: Vec, + + /// The threads created to handle blocking tasks. + blocking_threads: Vec, + + /// The main task queue that threads can use to process tasks. + task_queue: Arc>> +} + + + +macro_rules! change_thread_state +{ + ($state: expr, $state_holder: expr, $sender: expr) => + { + { + $state_holder = $state; + match $sender.send($state_holder) + { + Ok(_) => + { + } + + Err(error) => + { + // We lost our connection to the + // state channel. + warn!("{}", error); + } + } + } + } } impl Scheduler { - /// - pub fn new(num_threads: Option) -> Scheduler + /// + pub fn new(min_threads: Option, max_threads: Option) -> Scheduler { - let thread_count: u64; - let unlimited_threads: bool; + let min_count: u64; + let max_count: u64; - // Check to see if the scheduler will - // be using unlimited threads. - match num_threads + // See if there is a minimum number of threads that + // the scheduler is supposed to keep around. + match min_threads + { + Some(count) => + { + min_count = count; + } + + None => + { + min_count = 0; + } + } + + // Get the maximum amount of threads that the + // scheduler can create. This is for compute threads + // since I/O threads will block. + match max_threads { Some(count) => { // Set the maximum number of threads to be the desired // amount. - thread_count = count; - unlimited_threads = false; + max_count = count; } None => { - // Set that there can be an unlimited amount of threads. - thread_count = 0; - unlimited_threads = true; + // We don't know how many threads we can create. + // Do the ideal thing and create 1 thread per core. + max_count = 1; } } - // Create the new Scheduler. + debug!("Creating Scheduler with {} max threads.", max_count); + Scheduler { - has_unlimited_threads: unlimited_threads, - maximum_thread_amount: thread_count, - thread_list: Vec::new() + minimum_thread_amount: min_count, + maximum_thread_amount: max_count, + maximum_thread_idle_time: Duration::new(5, 0), + compute_threads: Vec::with_capacity(max_count as usize), + blocking_threads: Vec::with_capacity(max_count as usize), + task_queue: Arc::new(Mutex::new(BinaryHeap::new())) } } - fn create_thread() + /// Queues a task to be processed. + pub fn queue_task(&mut self, task: Task) { + // Just add the task to the queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + guard.deref_mut().push(task); + } + + Err(error) => + { + error!("{}", error); + } + } + } + + /// Processing the currently queued tasks. + /// + /// If this is not a single iteration, then it will block + /// the current thread until all tasks are finished processed. + /// This does not count immortal tasks. + pub fn process_tasks(&mut self, is_single_iteration: bool) + { + let mut continue_processing: bool; + let mut new_thread_count: u64; + + // Continue to process the tasks as long as possible. + continue_processing = true; + while continue_processing == true + { + debug!("There are currently {} tasks to process.", + self.get_task_count()); + debug!("Currently running {} out of {} threads.", + self.compute_threads.len(), self.maximum_thread_amount); + + // Determine how many threads need to be created. + new_thread_count = self.determine_new_thread_amount(); + + // Create the threads that we determined that we needed + // to process the current tasks. + self.spawn_compute_threads(new_thread_count); + + // Check to see if this was to be a single iteration + // of the task scheduler. + if is_single_iteration == true + { + // This was only a single iteration, + // so stop the processing. + continue_processing = false; + } +// else +// { + // Let the thread sleep before + // handling the next iteration. + ::std::thread::sleep(Duration::new(1, 0)); +// } + } + } + + /// Get the amount of tasks in the queue. + fn get_task_count(&mut self) -> u64 + { + // Get the size of the task queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + return guard.deref_mut().len() as u64; + } + + Err(error) => + { + error!("{}", error); + } + } + + 0u64 + } + + /// This will calculate the new amount of compute threads + /// that need to be created. + fn determine_new_thread_amount(&mut self) -> u64 + { + let task_count: u64; + let thread_count: u64; + let potential_threads: u64; + let mut desired_threads: u64; + let mut available_threads: u64; + let mut new_thread_count: u64; + + // Determine the amount of tasks that need to be processed. + task_count = self.get_task_count(); + debug!("There are currently {} compute tasks to process.", task_count); + + // Determine how many threads we currently have available. + thread_count = self.compute_threads.len() as u64; + debug!("Currently running {} out of {} compute threads.", + thread_count, self.maximum_thread_amount); + + // Determine how many of these threads currently are + // not processing anything. + available_threads = 0u64; + for thread_data in self.compute_threads.iter() + { + match thread_data.state + { + ThreadState::Idle => + { + available_threads += 1u64; + } + + ThreadState::Starting => + { + available_threads += 1u64; + } + + _ => + { + } + } + } + + // Calculate how many threads we would need to + // process all the tasks in our queue if each + // task was given its own thread. + println!("task_count: {}\navailable_threads: {}", task_count, available_threads); + desired_threads = 0u64; + if task_count > available_threads + { + desired_threads = task_count - available_threads; + } + new_thread_count = desired_threads; + + // Calculate how many potential threads can be created. + potential_threads = self.maximum_thread_amount - thread_count; + + // We are limited by a hard ceiling of runnable threads. + // Make sure we have as many as we can to process + // the currently queued tasks. + if potential_threads < desired_threads + { + // Make sure we have atleast the maximum number + // of threads aloud. + new_thread_count = self.maximum_thread_amount - thread_count; + } + + // Return the determined amount of threads to create. + new_thread_count + } + + /// + fn spawn_compute_threads(&mut self, amount: u64) + { + let mut shutdown_channel: (Sender, Receiver); + let mut state_channel: (Sender, Receiver); + let mut shutdown_receiver: Receiver; + let mut state_sender: Sender; + let mut join_handle: JoinHandle<()>; + let mut task_queue: Arc>>; + let mut data: ThreadData; + + // Loop through and create all the required threads. + debug!("Spawning {} compute threads", amount); + for _i in 0..amount + { + // Create the channel to shutdown the thread. + shutdown_channel = channel::(); + shutdown_receiver = shutdown_channel.1; + + // Create the channel to retrieve the + // status of the thread. + state_channel = channel::(); + state_sender = state_channel.0; + + // Clone the task queue for the thread. + task_queue = self.task_queue.clone(); + + // Create a new Thread. + join_handle = ::std::thread::spawn(move || + { + let mut check_messages: bool; + let mut continue_running: bool; + let mut current_state: ThreadState; + let mut current_task: Option; + + println!("Starting thread."); + + // Threads start off in the starting state. + current_state = ThreadState::Starting; + + // This thread will start off with no + // tasks to process. + current_task = None; + + // Run this thread until the scheduler decides + // to shut it down. + change_thread_state!(ThreadState::Idle, current_state, + state_sender); + continue_running = true; + while continue_running == true + { + println!("Running thread."); + process_compute_thread(&mut current_task, + &mut current_state, + &mut task_queue, + &state_sender); + + // Sleep the thread so that other threads + // get a chance to run + ::std::thread::sleep(Duration::new(0, 100)); + + // Check to see if this thread should be shutdown. + check_messages = true; + while check_messages == true + { + match shutdown_receiver.try_recv() + { + Ok(val) => + { + continue_running = !val; + } + + Err(error) => + { + match error + { + TryRecvError::Empty => + { + // No messages to handle. + check_messages = false; + debug!("There were no shutdown messages."); + } + + TryRecvError::Disconnected => + { + // We lost our connection to the + // shutdown channel. + warn!("{}", error); + } + } + } + } + } + } + + // This thread is finished. + println!("Shutting down thread."); + change_thread_state!(ThreadState::Idle, current_state, + state_sender); + } + ); + + // Create a new set of data for the + // thread we want to create. + data = ThreadData::new(join_handle, shutdown_channel.0, + state_channel.1); + + // Add the new ThreadData to the set + // of compute threads. + self.compute_threads.push(data); + } + } +} + +impl Drop for Scheduler +{ + fn drop(&mut self) + { + debug!("Destroying scheduler."); + + // Stop any threads that are running. + debug!("Stopping threads."); + for thread_data in self.compute_threads.iter_mut() + { + // Stop the thread. + thread_data.shutdown_sender.send(true); + } + + // Wait about 5 seconds to make sure all + // the threads have been stopped. + debug!("Giving threads a chance to end."); + ::std::thread::sleep(Duration::new(5, 0)); + } +} + + + +/// +fn process_compute_thread(current_task: &mut Option, + current_state: &mut ThreadState, + task_queue: &mut Arc>>, + state_sender: &Sender) +{ + let mut task_completed: bool; + + // No task was recently completed. + task_completed = false; + + // Make sure that this thread has a Task + // to currently work on. + match *current_task + { + Some(ref mut task) => + { + // Process the task this thread is + // currently working on. + task.process(); + + match task.state + { + TaskState::Finished => + { + task_completed = true; + } + + _ => + { + } + } + } + + None => + { + // This thread does not have a current task. + // Get another task to work on from the Queue. + match task_queue.lock() + { + Ok(ref mut guard) => + { + *current_task = guard.deref_mut().pop(); + + match *current_task + { + Some(_) => + { + debug!("Received a task."); + change_thread_state!(ThreadState::Processing, + *current_state, state_sender); + } + + None => + { + debug!("No new task to process."); + } + } + } + + Err(error) => + { + error!("{}", error); + } + } + + // If we don't have a task to process, + // then we may need to switch over to + // an idle state. + if current_task.is_none() + { + match *current_state + { + ThreadState::Idle => + { + // The thread is already sitting idle. + } + + _ => + { + // There is nothing for this thread + // to process, so mark the thread as idle. + change_thread_state!(ThreadState::Idle, *current_state, + state_sender); + } + } + } + } + } + + if task_completed == true + { + println!("Task completed."); + *current_task = None; + change_thread_state!(ThreadState::Idle, *current_state, + state_sender); } } diff --git a/src/status.rs b/src/status.rs new file mode 100644 index 0000000..ee4e988 --- /dev/null +++ b/src/status.rs @@ -0,0 +1,9 @@ +/// +pub enum Status +{ + /// + Waiting, + + /// + Finished +} diff --git a/src/task.rs b/src/task.rs index 0b3f403..3c3a811 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,8 +1,16 @@ +use std::cmp::Ordering; + +use ::task_state::TaskState; + + /// pub struct Task { /// - name: String + pub name: String, + + /// + pub state: TaskState } @@ -14,7 +22,50 @@ impl Task { Task { - name: String::from(task_name) + name: String::from(task_name), + state: TaskState::Starting } } + + /// + pub fn set_processing_func(&self) -> i8 + { + 0 + } + + /// + pub fn process(&mut self) + { + self.state = TaskState::Processing; + println!("Processing task."); + self.state = TaskState::Finished; + } +} + +impl PartialEq for Task +{ + fn eq(&self, other: &Self) -> bool + { + self.name.eq(&other.name) + } +} + +impl Eq for Task +{ +} + +impl PartialOrd for Task +{ + fn partial_cmp(&self, other: &Self) -> Option + { + Some(self.cmp(other)) + } +} + +impl Ord for Task +{ + fn cmp(&self, other: &Self) -> Ordering + { + self.name.cmp(&other.name) + } } diff --git a/src/task_state.rs b/src/task_state.rs new file mode 100644 index 0000000..10f4c11 --- /dev/null +++ b/src/task_state.rs @@ -0,0 +1,40 @@ +/// The different states a Task can go through during its lifetime. +pub enum TaskState +{ + /// The state that every Task starts in. + /// + /// The STARTING state can only lead to the processing state. + Starting, + + /// The state that each Task is in while it is actually + /// being process and running on a thread. + /// + /// The PROCESSING state can only lead to WAITING, + /// FINISHED, or UNKNOWN states. + Processing, + + /// The state a Task is in while it is waiting for child + /// tasks to switch to the FINISHED state. + /// + /// The WAITING state can only lead to the DONE_WAITING + /// and UNKNOWN states. + Waiting, + + /// The state a Task is in once it is done processing + /// child tasks, but prior to going back to the PROCESSING state. + /// + /// The DONE_WAITING state can only lead to the PROCESSING + /// and UNKNOWN states. + DoneWaiting, + + /// The state a Task will be in when it is FINISHED processing + /// and ready to be destroyed. + /// + /// The FINISHED state cannot lead to any states and shows that the + /// task is completed. + Finished, + + /// The state a Task will be placed in if it is detected to be + /// in an inproper state during its lifetime. + Unknown +} diff --git a/src/task_type.rs b/src/task_type.rs new file mode 100644 index 0000000..dd9d4e6 --- /dev/null +++ b/src/task_type.rs @@ -0,0 +1,10 @@ +/// +pub enum TaskType +{ + /// A task ment to get its own thread so that + /// it can run as long as it needs to. + Immortal, + + /// + Mortal +} diff --git a/src/thread.rs b/src/thread.rs index 02ff231..4d6c814 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,20 +1,193 @@ +use std::collections::binary_heap::BinaryHeap; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::thread::JoinHandle; +use std::time::Duration; +use std::vec::Vec; + +use ::task::Task; +use ::task_state::TaskState; +use ::thread_data::ThreadData; +use ::thread_state::ThreadState; + +use chrono::MonotonicTime; + + + /// pub struct Thread { /// - empty: i8 + state: ThreadState, + + /// + current_task: Option, + + /// + task_queue: Arc>>, + + /// + shutdown_receiver: Receiver, + + /// + state_sender: Sender } impl Thread { - /// - pub fn new() -> Thread + /// + pub fn new(queue: Arc>>, + shutdown_r: Receiver, + state_s: Sender) + -> Thread { Thread { - empty: 0 + state: ThreadState::Starting, + current_task: None, + task_queue: queue, + shutdown_receiver: shutdown_r, + state_sender: state_s + } + } + + /// + pub fn process(&mut self) + { + let mut task_completed: bool; + + // No task was recently completed. + task_completed = false; + + // Make sure that this thread has a Task + // to currently work on. + match self.current_task + { + Some(ref mut task) => + { + // Process the task this thread is + // currently working on. + task.process(); + + match task.state + { + TaskState::Finished => + { + task_completed = true; + } + + _ => + { + } + } + } + + None => + { + // This thread does not have a current task. + // Get another task to work on from the Queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + self.current_task = guard.deref_mut().pop(); + + match self.current_task + { + Some(_) => + { + debug!("Received a task."); + } + + None => + { + debug!("No new task to process."); + } + } + } + + Err(error) => + { + error!("{}", error); + } + } + + // If we don't have a task to process, + // then we may need to switch over to + // an idle state. + match self.current_task + { + Some(_) => + { + self.change_state(ThreadState::Processing); + } + + None => + { + match self.state + { + ThreadState::Idle => + { + // The thread is already sitting idle. + } + + _ => + { + // There is nothing for this thread + // to process, so mark the thread as idle. + self.change_state(ThreadState::Idle); + } + } + } + } + } + } + + if task_completed == true + { + println!("Task completed."); + self.current_task = None; + self.change_state(ThreadState::Idle); + } + } + + /// + fn change_state(&mut self, new_state: ThreadState) + { + self.state = new_state; + match self.state_sender.send(self.state) + { + Ok(_) => + { + } + + Err(error) => + { + // We lost our connection to the + // state channel. + warn!("{}", error); + } + } + } + + /// Queues a task to be processed. + fn queue_task(&mut self, task: Task) + { + // Just add the task to the queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + guard.deref_mut().push(task); + } + + Err(error) => + { + error!("{}", error); + } } } } diff --git a/src/thread_data.rs b/src/thread_data.rs new file mode 100644 index 0000000..6e79fca --- /dev/null +++ b/src/thread_data.rs @@ -0,0 +1,45 @@ +use std::sync::mpsc::{Sender, Receiver}; +use std::thread::JoinHandle; + +use ::thread_state::ThreadState; + +use chrono::MonotonicTime; + + + +/// +pub struct ThreadData +{ + /// + pub state: ThreadState, + + /// + pub handle: JoinHandle<()>, + + /// + pub idle_time: MonotonicTime, + + /// + pub shutdown_sender: Sender, + + /// + pub state_receiver: Receiver, +} + + + +impl ThreadData +{ + pub fn new(join_handle: JoinHandle<()>, shutdown_s: Sender, + state_r: Receiver) -> ThreadData + { + ThreadData + { + state: ThreadState::Starting, + handle: join_handle, + idle_time: MonotonicTime::current(), + shutdown_sender: shutdown_s, + state_receiver: state_r + } + } +} diff --git a/src/thread_state.rs b/src/thread_state.rs new file mode 100644 index 0000000..7a5f04d --- /dev/null +++ b/src/thread_state.rs @@ -0,0 +1,16 @@ +/// +#[derive(Clone, Copy)] +pub enum ThreadState +{ + /// + Starting, + + /// + Idle, + + /// + Processing, + + /// + Finished +}