diff --git a/src/blocking_thread.rs b/src/blocking_thread.rs new file mode 100644 index 0000000..ba7cf3e --- /dev/null +++ b/src/blocking_thread.rs @@ -0,0 +1,162 @@ +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use std::time::Duration; + +use ::task::Task; +use ::task_state::TaskState; +use ::thread::Thread; +use ::thread_state::ThreadState; + + + +/// +pub struct BlockingThread +{ + /// + state: ThreadState, + + /// + current_task: Box, + + /// + shutdown_receiver: Receiver, + + /// + state_sender: Sender, + + /// + continue_running: bool +} + + + +impl BlockingThread +{ + /// + pub fn new(task: Box, + shutdown_r: Receiver, + state_s: Sender) + -> BlockingThread + { + BlockingThread + { + state: ThreadState::Starting, + current_task: task, + shutdown_receiver: shutdown_r, + state_sender: state_s, + continue_running: true + } + } + + /// + fn change_state(&mut self, new_state: ThreadState) + { + self.state = new_state; + match self.state_sender.send(self.state) + { + Ok(_) => + { + } + + Err(error) => + { + // We lost our connection to the + // state channel. + warn!("{}", error); + } + } + } + + /// Queues a task to be processed. For a + /// blocking thread this needs to queue a + /// new blocking thread task. + fn queue_task(&mut self, task: Box) + { + } + + /// Check for any new thread shutdown messages. + fn process_shutdown_messages(&mut self) + { + let mut check_messages: bool; + + // Loop through all the messages in the + // receivers queue. + check_messages = true; + while check_messages == true + { + match self.shutdown_receiver.try_recv() + { + Ok(val) => + { + // Found a message. + self.continue_running = !val; + } + + Err(error) => + { + // Determine the kind of error we received. + match error + { + TryRecvError::Empty => + { + // No messages to handle. + check_messages = false; + debug!("There were no shutdown messages."); + } + + TryRecvError::Disconnected => + { + // We lost our connection to the + // shutdown channel. + // TODO: Handle this poisoning correctly. + warn!("{}", error); + } + } + } + } + } + } +} + +impl Thread for BlockingThread +{ + /// + fn process(&mut self) + { + // Run this thread until the scheduler decides + // to shut it down. + self.change_state(ThreadState::Idle); + self.change_state(ThreadState::Processing); + while self.continue_running == true + { + // Process the task this thread is + // currently working on. + self.current_task.process(); + + match self.current_task.get_state() + { + TaskState::Finished => + { + self.continue_running = false; + } + + _ => + { + } + } + + // Sleep the thread so that other threads + // get a chance to run. + ::std::thread::sleep(Duration::new(0, 100)); + + // Check to see if this thread should be shutdown. + if self.continue_running == true + { + self.process_shutdown_messages(); + } + } + + // This thread is finished. + println!("Shutting down thread."); + self.change_state(ThreadState::Finished); + } +} diff --git a/src/compute_thread.rs b/src/compute_thread.rs new file mode 100644 index 0000000..8871321 --- /dev/null +++ b/src/compute_thread.rs @@ -0,0 +1,274 @@ +use std::collections::binary_heap::BinaryHeap; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use std::time::Duration; + +use ::task::Task; +use ::task_state::TaskState; +use ::thread::Thread; +use ::thread_state::ThreadState; + + + +/// +pub struct ComputeThread +{ + /// + state: ThreadState, + + /// + current_task: Option>, + + /// + task_queue: Arc>>>, + + /// + shutdown_receiver: Receiver, + + /// + state_sender: Sender, + + /// + continue_running: bool, +} + + + +impl ComputeThread +{ + /// + pub fn new(queue: Arc>>>, + shutdown_r: Receiver, + state_s: Sender) + -> ComputeThread + { + ComputeThread + { + state: ThreadState::Starting, + current_task: None, + task_queue: queue, + shutdown_receiver: shutdown_r, + state_sender: state_s, + continue_running: true + } + } + + /// + fn change_state(&mut self, new_state: ThreadState) + { + self.state = new_state; + match self.state_sender.send(self.state) + { + Ok(_) => + { + } + + Err(error) => + { + // We lost our connection to the + // state channel. + warn!("{}", error); + } + } + } + + /// + fn retrieve_task(&mut self) + { + // There is nothing to do if this thread already + // has a task to work on. + if self.current_task.is_some() == true + { + return; + } + + // This thread does not have a current task. + // Get another task to work on from the Queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + self.current_task = guard.deref_mut().pop(); + + match self.current_task + { + Some(_) => + { + debug!("Received a task."); + } + + None => + { + debug!("No new task to process."); + } + } + } + + Err(error) => + { + error!("{}", error); + } + } + } + + /// Queues a task to be processed. + fn queue_task(&mut self, task: Box) + { + // Just add the task to the queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + guard.deref_mut().push(task); + } + + Err(error) => + { + error!("{}", error); + } + } + } + + /// Check for any new thread shutdown messages. + fn process_shutdown_messages(&mut self) + { + let mut check_messages: bool; + + // Loop through all the messages in the + // receivers queue. + check_messages = true; + while check_messages == true + { + match self.shutdown_receiver.try_recv() + { + Ok(val) => + { + // Found a message. + self.continue_running = !val; + } + + Err(error) => + { + // Determine the kind of error we received. + match error + { + TryRecvError::Empty => + { + // No messages to handle. + check_messages = false; + debug!("There were no shutdown messages."); + } + + TryRecvError::Disconnected => + { + // We lost our connection to the + // shutdown channel. + // TODO: Handle this poisoning correctly. + warn!("{}", error); + } + } + } + } + } + } +} + +impl Thread for ComputeThread +{ + /// + fn process(&mut self) + { + let mut check_messages: bool; + let mut task_completed: bool; + + // Run this thread until the scheduler decides + // to shut it down. + self.change_state(ThreadState::Idle); + while self.continue_running == true + { + // No task was recently completed. + task_completed = false; + + // Make sure that this thread has a Task + // to currently work on. + match self.current_task + { + Some(ref mut task) => + { + // Process the task this thread is + // currently working on. + task.process(); + + match task.get_state() + { + TaskState::Finished => + { + task_completed = true; + } + + _ => + { + } + } + } + + None => + { + // Try to get a task to work on. + self.retrieve_task(); + match self.current_task + { + Some(_) => + { + // We have a task to work on, so switch to + // a Processing state. + self.change_state(ThreadState::Processing); + } + + None => + { + // If we don't have a task to process, + // then we may need to switch over to + // an idle state. + match self.state + { + ThreadState::Idle => + { + // The thread is already sitting idle. + } + + _ => + { + // There is nothing for this thread + // to process, so mark the thread as idle. + self.change_state(ThreadState::Idle); + } + } + } + } + } + } + + // Check to see if the task this thread + // was processing was completed. + if task_completed == true + { + println!("Task completed."); + self.current_task = None; + self.change_state(ThreadState::Idle); + } + + // Sleep the thread so that other threads + // get a chance to run + ::std::thread::sleep(Duration::new(0, 100)); + + // Check to see if this thread should be shutdown. + self.process_shutdown_messages(); + } + + // This thread is finished. + println!("Shutting down thread."); + self.change_state(ThreadState::Finished); + } +} diff --git a/src/lib.rs b/src/lib.rs index 9de434e..638896f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,11 @@ extern crate chrono; mod scheduler; mod task; +mod taskable; +mod task_container; mod task_state; +mod blocking_thread; +mod compute_thread; mod thread; mod thread_data; mod thread_state; @@ -18,5 +22,4 @@ mod thread_state; pub use self::scheduler::Scheduler; -pub use self::task::Task; -pub use self::task_state::TaskState; +pub use self::taskable::Taskable; diff --git a/src/scheduler.rs b/src/scheduler.rs index 67a539e..42fad0b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -6,7 +6,13 @@ use std::thread::JoinHandle; use std::time::Duration; use std::vec::Vec; +use chrono::MonotonicTime; + use ::task::Task; +use ::taskable::Taskable; +use ::task_container::TaskContainer; +use ::blocking_thread::BlockingThread; +use ::compute_thread::ComputeThread; use ::thread::Thread; use ::thread_data::ThreadData; use ::thread_state::ThreadState; @@ -36,7 +42,7 @@ pub struct Scheduler blocking_threads: Vec, /// The main task queue that threads can use to process tasks. - task_queue: Arc>> + task_queue: Arc>>> } @@ -86,6 +92,7 @@ impl Scheduler debug!("Creating Scheduler with {} max threads.", max_count); + // Create the new scheduler. Scheduler { minimum_thread_amount: min_count, @@ -98,14 +105,19 @@ impl Scheduler } /// Queues a task to be processed. - pub fn queue_task(&mut self, task: Task) + pub fn queue_task(&mut self, task: T) where T: Taskable + 'static { + let container: TaskContainer; + + // Create the container to hold the task. + container = TaskContainer::new(task); + // Just add the task to the queue. match self.task_queue.lock() { Ok(ref mut guard) => { - guard.deref_mut().push(task); + guard.deref_mut().push(Box::new(container)); } Err(error) => @@ -115,6 +127,25 @@ impl Scheduler } } + /// Queues a task, that is known to block, to be processed. + pub fn queue_blocking_task(&mut self, task: T) where T: Taskable+ 'static + { + let data: ThreadData; + let container: TaskContainer; + + // Create the container to hold the task. + container = TaskContainer::new(task); + + // Spawn a new blocking thread + // and process this task on it. + debug!("Spawning blocking thread."); + + // Add the new ThreadData to the set + // of compute threads. + data = self.spawn_blocking_thread(Box::new(container)); + self.blocking_threads.push(data); + } + /// Processing the currently queued tasks. /// /// If this is not a single iteration, then it will block @@ -141,6 +172,10 @@ impl Scheduler // to process the current tasks. self.spawn_compute_threads(new_thread_count); + // Check to see if there are any compute threads + // that need to be destroyed. + self.trim_compute_threads(); + // Check to see if this was to be a single iteration // of the task scheduler. if is_single_iteration == true @@ -149,12 +184,10 @@ impl Scheduler // so stop the processing. continue_processing = false; } -// else -// { - // Let the thread sleep before - // handling the next iteration. - ::std::thread::sleep(Duration::new(0, 100)); -// } + + // Let the thread sleep before + // handling the next iteration. + ::std::thread::sleep(Duration::new(0, 100)); } } @@ -247,45 +280,83 @@ impl Scheduler new_thread_count } + /// + fn trim_compute_threads(&mut self) + { + + } + /// fn spawn_compute_threads(&mut self, amount: u64) { - let mut shutdown_channel: (Sender, Receiver); - let mut state_channel: (Sender, Receiver); - let mut join_handle: JoinHandle<()>; let mut data: ThreadData; - let mut new_thread: Thread; // Loop through and create all the required threads. debug!("Spawning {} compute threads", amount); for _i in 0..amount { - // Create the channel to shutdown the thread. - shutdown_channel = channel::(); - - // Create the channel to retrieve the - // status of the thread. - state_channel = channel::(); - - // Create a new Thread. - new_thread = Thread::new(self.task_queue.clone(), - shutdown_channel.1, state_channel.0); - join_handle = ::std::thread::spawn(move || - { - new_thread.process(); - } - ); - - // Create a new set of data for the - // thread we want to create. - data = ThreadData::new(join_handle, shutdown_channel.0, - state_channel.1); - // Add the new ThreadData to the set // of compute threads. + data = self.spawn_compute_thread(); self.compute_threads.push(data); } } + + fn spawn_compute_thread(&self) -> ThreadData + { + let shutdown_channel: (Sender, Receiver); + let state_channel: (Sender, Receiver); + let join_handle: JoinHandle<()>; + let mut new_thread: ComputeThread; + + // Create the channel to shutdown the thread. + shutdown_channel = channel::(); + + // Create the channel to retrieve the + // status of the thread. + state_channel = channel::(); + + // Create a new Thread. + new_thread = ComputeThread::new(self.task_queue.clone(), + shutdown_channel.1, state_channel.0); + join_handle = ::std::thread::spawn(move || + { + new_thread.process(); + } + ); + + // Create a new set of data for the + // thread we want to create. + ThreadData::new(join_handle, shutdown_channel.0, state_channel.1) + } + + fn spawn_blocking_thread(&self, task: Box) -> ThreadData + { + let shutdown_channel: (Sender, Receiver); + let state_channel: (Sender, Receiver); + let join_handle: JoinHandle<()>; + let mut new_thread: BlockingThread; + + // Create the channel to shutdown the thread. + shutdown_channel = channel::(); + + // Create the channel to retrieve the + // status of the thread. + state_channel = channel::(); + + // Create a new Thread. + new_thread = BlockingThread::new(task, shutdown_channel.1, + state_channel.0); + join_handle = ::std::thread::spawn(move || + { + new_thread.process(); + } + ); + + // Create a new set of data for the + // thread we want to create. + ThreadData::new(join_handle, shutdown_channel.0, state_channel.1) + } } impl Drop for Scheduler @@ -295,7 +366,24 @@ impl Drop for Scheduler debug!("Destroying scheduler."); // Stop any threads that are running. - debug!("Stopping threads."); + debug!("Stopping blocking threads."); + for thread_data in self.blocking_threads.iter_mut() + { + // Stop the thread. + match thread_data.shutdown_sender.send(true) + { + Ok(_) => + { + } + + Err(error) => + { + warn!("{}", error); + } + } + } + + debug!("Stopping compute threads."); for thread_data in self.compute_threads.iter_mut() { // Stop the thread. diff --git a/src/task.rs b/src/task.rs index 3c3a811..254b881 100644 --- a/src/task.rs +++ b/src/task.rs @@ -3,50 +3,22 @@ use std::cmp::Ordering; use ::task_state::TaskState; -/// -pub struct Task -{ - /// - pub name: String, - /// - pub state: TaskState +pub trait Task: Send + Sync +{ + fn get_name(&self) -> &str; + fn get_state(&self) -> TaskState; + + fn process(&mut self); } -impl Task -{ - /// - pub fn new(task_name: &str) -> Task - { - Task - { - name: String::from(task_name), - state: TaskState::Starting - } - } - - /// - pub fn set_processing_func(&self) -> i8 - { - 0 - } - - /// - pub fn process(&mut self) - { - self.state = TaskState::Processing; - println!("Processing task."); - self.state = TaskState::Finished; - } -} - impl PartialEq for Task { fn eq(&self, other: &Self) -> bool { - self.name.eq(&other.name) + self.get_name().to_string().eq(&other.get_name().to_string()) } } @@ -66,6 +38,6 @@ impl Ord for Task { fn cmp(&self, other: &Self) -> Ordering { - self.name.cmp(&other.name) + self.get_name().to_string().cmp(&other.get_name().to_string()) } } diff --git a/src/task_container.rs b/src/task_container.rs new file mode 100644 index 0000000..97ee6ac --- /dev/null +++ b/src/task_container.rs @@ -0,0 +1,53 @@ +use ::task::Task; +use ::taskable::Taskable; +use ::task_state::TaskState; + + + +/// +pub struct TaskContainer +{ + /// + pub state: TaskState, + + /// + pub task: T +} + + + +impl TaskContainer where T: Taskable +{ + /// + pub fn new(task: T) -> TaskContainer + { + TaskContainer + { + state: TaskState::Starting, + task: task + } + } +} + +impl Task for TaskContainer where T: Taskable +{ + /// + fn get_name(&self) -> &str + { + self.task.get_name() + } + + /// + fn get_state(&self) -> TaskState + { + self.state + } + + /// + fn process(&mut self) + { + self.state = TaskState::Processing; + println!("Processing task."); + self.state = TaskState::Finished; + } +} diff --git a/src/task_state.rs b/src/task_state.rs index bd39ea7..9adabfb 100644 --- a/src/task_state.rs +++ b/src/task_state.rs @@ -1,4 +1,5 @@ /// The different states a Task can go through during its lifetime. +#[derive(Clone, Copy)] pub enum TaskState { /// The state that every Task starts in. diff --git a/src/taskable.rs b/src/taskable.rs new file mode 100644 index 0000000..5ba83b9 --- /dev/null +++ b/src/taskable.rs @@ -0,0 +1,9 @@ +/// +pub trait Taskable: Send + Sync +{ + /// + fn get_name(&self) -> &str; + + /// + fn process(&mut self); +} diff --git a/src/thread.rs b/src/thread.rs index 49e2f55..b2bd7f7 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,270 +1,6 @@ -use std::collections::binary_heap::BinaryHeap; -use std::ops::DerefMut; -use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{Receiver, Sender, TryRecvError}; -use std::time::Duration; - -use ::task::Task; -use ::task_state::TaskState; -use ::thread_state::ThreadState; - - - /// -pub struct Thread +pub trait Thread: Send { /// - state: ThreadState, - - /// - current_task: Option, - - /// - task_queue: Arc>>, - - /// - shutdown_receiver: Receiver, - - /// - state_sender: Sender, - - /// - continue_running: bool, -} - - - -impl Thread -{ - /// - pub fn new(queue: Arc>>, - shutdown_r: Receiver, - state_s: Sender) - -> Thread - { - Thread - { - state: ThreadState::Starting, - current_task: None, - task_queue: queue, - shutdown_receiver: shutdown_r, - state_sender: state_s, - continue_running: true - } - } - - /// - pub fn process(&mut self) - { - let mut check_messages: bool; - let mut task_completed: bool; - - // Run this thread until the scheduler decides - // to shut it down. - self.change_state(ThreadState::Idle); - while self.continue_running == true - { - // No task was recently completed. - task_completed = false; - - // Make sure that this thread has a Task - // to currently work on. - match self.current_task - { - Some(ref mut task) => - { - // Process the task this thread is - // currently working on. - task.process(); - - match task.state - { - TaskState::Finished => - { - task_completed = true; - } - - _ => - { - } - } - } - - None => - { - // Try to get a task to work on. - self.retrieve_task(); - match self.current_task - { - Some(_) => - { - // We have a task to work on, so switch to - // a Processing state. - self.change_state(ThreadState::Processing); - } - - None => - { - // If we don't have a task to process, - // then we may need to switch over to - // an idle state. - match self.state - { - ThreadState::Idle => - { - // The thread is already sitting idle. - } - - _ => - { - // There is nothing for this thread - // to process, so mark the thread as idle. - self.change_state(ThreadState::Idle); - } - } - } - } - } - } - - // Check to see if the task this thread - // was processing was completed. - if task_completed == true - { - println!("Task completed."); - self.current_task = None; - self.change_state(ThreadState::Idle); - } - - // Sleep the thread so that other threads - // get a chance to run - ::std::thread::sleep(Duration::new(0, 100)); - - // Check to see if this thread should be shutdown. - self.process_shutdown_messages(); - } - - // This thread is finished. - println!("Shutting down thread."); - self.change_state(ThreadState::Finished); - } - - /// - fn change_state(&mut self, new_state: ThreadState) - { - self.state = new_state; - match self.state_sender.send(self.state) - { - Ok(_) => - { - } - - Err(error) => - { - // We lost our connection to the - // state channel. - warn!("{}", error); - } - } - } - - /// - fn retrieve_task(&mut self) - { - // There is nothing to do if this thread already - // has a task to work on. - if self.current_task.is_some() == true - { - return; - } - - // This thread does not have a current task. - // Get another task to work on from the Queue. - match self.task_queue.lock() - { - Ok(ref mut guard) => - { - self.current_task = guard.deref_mut().pop(); - - match self.current_task - { - Some(_) => - { - debug!("Received a task."); - } - - None => - { - debug!("No new task to process."); - } - } - } - - Err(error) => - { - error!("{}", error); - } - } - } - - /// Queues a task to be processed. - fn queue_task(&mut self, task: Task) - { - // Just add the task to the queue. - match self.task_queue.lock() - { - Ok(ref mut guard) => - { - guard.deref_mut().push(task); - } - - Err(error) => - { - error!("{}", error); - } - } - } - - /// Check for any new thread shutdown messages. - fn process_shutdown_messages(&mut self) - { - let mut check_messages: bool; - - // Loop through all the messages in the - // receivers queue. - check_messages = true; - while check_messages == true - { - match self.shutdown_receiver.try_recv() - { - Ok(val) => - { - // Found a message. - self.continue_running = !val; - } - - Err(error) => - { - // Determine the kind of error we received. - match error - { - TryRecvError::Empty => - { - // No messages to handle. - check_messages = false; - debug!("There were no shutdown messages."); - } - - TryRecvError::Disconnected => - { - // We lost our connection to the - // shutdown channel. - // TODO: Handle this poisoning correctly. - warn!("{}", error); - } - } - } - } - } - } + fn process(&mut self); }