diff --git a/src/scheduler.rs b/src/scheduler.rs index c3c0798..67a539e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,19 +1,16 @@ use std::collections::binary_heap::BinaryHeap; -use std::ops::{Deref, DerefMut}; +use std::ops::DerefMut; use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread::JoinHandle; use std::time::Duration; use std::vec::Vec; use ::task::Task; -use ::task_state::TaskState; use ::thread::Thread; use ::thread_data::ThreadData; use ::thread_state::ThreadState; -use chrono::MonotonicTime; - /// @@ -44,31 +41,6 @@ pub struct Scheduler -macro_rules! change_thread_state -{ - ($state: expr, $state_holder: expr, $sender: expr) => - { - { - $state_holder = $state; - match $sender.send($state_holder) - { - Ok(_) => - { - } - - Err(error) => - { - // We lost our connection to the - // state channel. - warn!("{}", error); - } - } - } - } -} - - - impl Scheduler { /// @@ -181,7 +153,7 @@ impl Scheduler // { // Let the thread sleep before // handling the next iteration. - ::std::thread::sleep(Duration::new(1, 0)); + ::std::thread::sleep(Duration::new(0, 100)); // } } } @@ -202,8 +174,6 @@ impl Scheduler error!("{}", error); } } - - 0u64 } /// This will calculate the new amount of compute threads @@ -282,11 +252,9 @@ impl Scheduler { let mut shutdown_channel: (Sender, Receiver); let mut state_channel: (Sender, Receiver); - let mut shutdown_receiver: Receiver; - let mut state_sender: Sender; let mut join_handle: JoinHandle<()>; - let mut task_queue: Arc>>; let mut data: ThreadData; + let mut new_thread: Thread; // Loop through and create all the required threads. debug!("Spawning {} compute threads", amount); @@ -294,88 +262,17 @@ impl Scheduler { // Create the channel to shutdown the thread. shutdown_channel = channel::(); - shutdown_receiver = shutdown_channel.1; // Create the channel to retrieve the // status of the thread. state_channel = channel::(); - state_sender = state_channel.0; - - // Clone the task queue for the thread. - task_queue = self.task_queue.clone(); // Create a new Thread. + new_thread = Thread::new(self.task_queue.clone(), + shutdown_channel.1, state_channel.0); join_handle = ::std::thread::spawn(move || { - let mut check_messages: bool; - let mut continue_running: bool; - let mut current_state: ThreadState; - let mut current_task: Option; - - println!("Starting thread."); - - // Threads start off in the starting state. - current_state = ThreadState::Starting; - - // This thread will start off with no - // tasks to process. - current_task = None; - - // Run this thread until the scheduler decides - // to shut it down. - change_thread_state!(ThreadState::Idle, current_state, - state_sender); - continue_running = true; - while continue_running == true - { - println!("Running thread."); - process_compute_thread(&mut current_task, - &mut current_state, - &mut task_queue, - &state_sender); - - // Sleep the thread so that other threads - // get a chance to run - ::std::thread::sleep(Duration::new(0, 100)); - - // Check to see if this thread should be shutdown. - check_messages = true; - while check_messages == true - { - match shutdown_receiver.try_recv() - { - Ok(val) => - { - continue_running = !val; - } - - Err(error) => - { - match error - { - TryRecvError::Empty => - { - // No messages to handle. - check_messages = false; - debug!("There were no shutdown messages."); - } - - TryRecvError::Disconnected => - { - // We lost our connection to the - // shutdown channel. - warn!("{}", error); - } - } - } - } - } - } - - // This thread is finished. - println!("Shutting down thread."); - change_thread_state!(ThreadState::Idle, current_state, - state_sender); + new_thread.process(); } ); @@ -402,7 +299,17 @@ impl Drop for Scheduler for thread_data in self.compute_threads.iter_mut() { // Stop the thread. - thread_data.shutdown_sender.send(true); + match thread_data.shutdown_sender.send(true) + { + Ok(_) => + { + } + + Err(error) => + { + warn!("{}", error); + } + } } // Wait about 5 seconds to make sure all @@ -411,104 +318,3 @@ impl Drop for Scheduler ::std::thread::sleep(Duration::new(5, 0)); } } - - - -/// -fn process_compute_thread(current_task: &mut Option, - current_state: &mut ThreadState, - task_queue: &mut Arc>>, - state_sender: &Sender) -{ - let mut task_completed: bool; - - // No task was recently completed. - task_completed = false; - - // Make sure that this thread has a Task - // to currently work on. - match *current_task - { - Some(ref mut task) => - { - // Process the task this thread is - // currently working on. - task.process(); - - match task.state - { - TaskState::Finished => - { - task_completed = true; - } - - _ => - { - } - } - } - - None => - { - // This thread does not have a current task. - // Get another task to work on from the Queue. - match task_queue.lock() - { - Ok(ref mut guard) => - { - *current_task = guard.deref_mut().pop(); - - match *current_task - { - Some(_) => - { - debug!("Received a task."); - change_thread_state!(ThreadState::Processing, - *current_state, state_sender); - } - - None => - { - debug!("No new task to process."); - } - } - } - - Err(error) => - { - error!("{}", error); - } - } - - // If we don't have a task to process, - // then we may need to switch over to - // an idle state. - if current_task.is_none() - { - match *current_state - { - ThreadState::Idle => - { - // The thread is already sitting idle. - } - - _ => - { - // There is nothing for this thread - // to process, so mark the thread as idle. - change_thread_state!(ThreadState::Idle, *current_state, - state_sender); - } - } - } - } - } - - if task_completed == true - { - println!("Task completed."); - *current_task = None; - change_thread_state!(ThreadState::Idle, *current_state, - state_sender); - } -} diff --git a/src/thread.rs b/src/thread.rs index 4d6c814..49e2f55 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -1,18 +1,13 @@ use std::collections::binary_heap::BinaryHeap; -use std::ops::{Deref, DerefMut}; +use std::ops::DerefMut; use std::sync::{Arc, Mutex}; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; -use std::thread::JoinHandle; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::Duration; -use std::vec::Vec; use ::task::Task; use ::task_state::TaskState; -use ::thread_data::ThreadData; use ::thread_state::ThreadState; -use chrono::MonotonicTime; - /// @@ -31,7 +26,10 @@ pub struct Thread shutdown_receiver: Receiver, /// - state_sender: Sender + state_sender: Sender, + + /// + continue_running: bool, } @@ -50,108 +48,105 @@ impl Thread current_task: None, task_queue: queue, shutdown_receiver: shutdown_r, - state_sender: state_s + state_sender: state_s, + continue_running: true } } /// pub fn process(&mut self) { + let mut check_messages: bool; let mut task_completed: bool; - // No task was recently completed. - task_completed = false; - - // Make sure that this thread has a Task - // to currently work on. - match self.current_task + // Run this thread until the scheduler decides + // to shut it down. + self.change_state(ThreadState::Idle); + while self.continue_running == true { - Some(ref mut task) => + // No task was recently completed. + task_completed = false; + + // Make sure that this thread has a Task + // to currently work on. + match self.current_task { - // Process the task this thread is - // currently working on. - task.process(); - - match task.state + Some(ref mut task) => { - TaskState::Finished => - { - task_completed = true; - } + // Process the task this thread is + // currently working on. + task.process(); - _ => + match task.state { - } - } - } - - None => - { - // This thread does not have a current task. - // Get another task to work on from the Queue. - match self.task_queue.lock() - { - Ok(ref mut guard) => - { - self.current_task = guard.deref_mut().pop(); - - match self.current_task + TaskState::Finished => { - Some(_) => - { - debug!("Received a task."); - } + task_completed = true; + } - None => - { - debug!("No new task to process."); - } + _ => + { } } - - Err(error) => - { - error!("{}", error); - } } - // If we don't have a task to process, - // then we may need to switch over to - // an idle state. - match self.current_task + None => { - Some(_) => + // Try to get a task to work on. + self.retrieve_task(); + match self.current_task { - self.change_state(ThreadState::Processing); - } - - None => - { - match self.state + Some(_) => { - ThreadState::Idle => - { - // The thread is already sitting idle. - } + // We have a task to work on, so switch to + // a Processing state. + self.change_state(ThreadState::Processing); + } - _ => + None => + { + // If we don't have a task to process, + // then we may need to switch over to + // an idle state. + match self.state { - // There is nothing for this thread - // to process, so mark the thread as idle. - self.change_state(ThreadState::Idle); + ThreadState::Idle => + { + // The thread is already sitting idle. + } + + _ => + { + // There is nothing for this thread + // to process, so mark the thread as idle. + self.change_state(ThreadState::Idle); + } } } } } } + + // Check to see if the task this thread + // was processing was completed. + if task_completed == true + { + println!("Task completed."); + self.current_task = None; + self.change_state(ThreadState::Idle); + } + + // Sleep the thread so that other threads + // get a chance to run + ::std::thread::sleep(Duration::new(0, 100)); + + // Check to see if this thread should be shutdown. + self.process_shutdown_messages(); } - if task_completed == true - { - println!("Task completed."); - self.current_task = None; - self.change_state(ThreadState::Idle); - } + // This thread is finished. + println!("Shutting down thread."); + self.change_state(ThreadState::Finished); } /// @@ -173,6 +168,45 @@ impl Thread } } + /// + fn retrieve_task(&mut self) + { + // There is nothing to do if this thread already + // has a task to work on. + if self.current_task.is_some() == true + { + return; + } + + // This thread does not have a current task. + // Get another task to work on from the Queue. + match self.task_queue.lock() + { + Ok(ref mut guard) => + { + self.current_task = guard.deref_mut().pop(); + + match self.current_task + { + Some(_) => + { + debug!("Received a task."); + } + + None => + { + debug!("No new task to process."); + } + } + } + + Err(error) => + { + error!("{}", error); + } + } + } + /// Queues a task to be processed. fn queue_task(&mut self, task: Task) { @@ -190,4 +224,47 @@ impl Thread } } } + + /// Check for any new thread shutdown messages. + fn process_shutdown_messages(&mut self) + { + let mut check_messages: bool; + + // Loop through all the messages in the + // receivers queue. + check_messages = true; + while check_messages == true + { + match self.shutdown_receiver.try_recv() + { + Ok(val) => + { + // Found a message. + self.continue_running = !val; + } + + Err(error) => + { + // Determine the kind of error we received. + match error + { + TryRecvError::Empty => + { + // No messages to handle. + check_messages = false; + debug!("There were no shutdown messages."); + } + + TryRecvError::Disconnected => + { + // We lost our connection to the + // shutdown channel. + // TODO: Handle this poisoning correctly. + warn!("{}", error); + } + } + } + } + } + } }