Pulled out the thread code to it's own class.

This commit is contained in:
Jason Travis Smith 2016-03-25 05:41:50 -04:00
parent 01f9b49498
commit d69f9b8e9b
2 changed files with 174 additions and 291 deletions

View File

@ -1,19 +1,16 @@
use std::collections::binary_heap::BinaryHeap;
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::JoinHandle;
use std::time::Duration;
use std::vec::Vec;
use ::task::Task;
use ::task_state::TaskState;
use ::thread::Thread;
use ::thread_data::ThreadData;
use ::thread_state::ThreadState;
use chrono::MonotonicTime;
///
@ -44,31 +41,6 @@ pub struct Scheduler
macro_rules! change_thread_state
{
($state: expr, $state_holder: expr, $sender: expr) =>
{
{
$state_holder = $state;
match $sender.send($state_holder)
{
Ok(_) =>
{
}
Err(error) =>
{
// We lost our connection to the
// state channel.
warn!("{}", error);
}
}
}
}
}
impl Scheduler
{
///
@ -181,7 +153,7 @@ impl Scheduler
// {
// Let the thread sleep before
// handling the next iteration.
::std::thread::sleep(Duration::new(1, 0));
::std::thread::sleep(Duration::new(0, 100));
// }
}
}
@ -202,8 +174,6 @@ impl Scheduler
error!("{}", error);
}
}
0u64
}
/// This will calculate the new amount of compute threads
@ -282,11 +252,9 @@ impl Scheduler
{
let mut shutdown_channel: (Sender<bool>, Receiver<bool>);
let mut state_channel: (Sender<ThreadState>, Receiver<ThreadState>);
let mut shutdown_receiver: Receiver<bool>;
let mut state_sender: Sender<ThreadState>;
let mut join_handle: JoinHandle<()>;
let mut task_queue: Arc<Mutex<BinaryHeap<Task>>>;
let mut data: ThreadData;
let mut new_thread: Thread;
// Loop through and create all the required threads.
debug!("Spawning {} compute threads", amount);
@ -294,88 +262,17 @@ impl Scheduler
{
// Create the channel to shutdown the thread.
shutdown_channel = channel::<bool>();
shutdown_receiver = shutdown_channel.1;
// Create the channel to retrieve the
// status of the thread.
state_channel = channel::<ThreadState>();
state_sender = state_channel.0;
// Clone the task queue for the thread.
task_queue = self.task_queue.clone();
// Create a new Thread.
new_thread = Thread::new(self.task_queue.clone(),
shutdown_channel.1, state_channel.0);
join_handle = ::std::thread::spawn(move ||
{
let mut check_messages: bool;
let mut continue_running: bool;
let mut current_state: ThreadState;
let mut current_task: Option<Task>;
println!("Starting thread.");
// Threads start off in the starting state.
current_state = ThreadState::Starting;
// This thread will start off with no
// tasks to process.
current_task = None;
// Run this thread until the scheduler decides
// to shut it down.
change_thread_state!(ThreadState::Idle, current_state,
state_sender);
continue_running = true;
while continue_running == true
{
println!("Running thread.");
process_compute_thread(&mut current_task,
&mut current_state,
&mut task_queue,
&state_sender);
// Sleep the thread so that other threads
// get a chance to run
::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
check_messages = true;
while check_messages == true
{
match shutdown_receiver.try_recv()
{
Ok(val) =>
{
continue_running = !val;
}
Err(error) =>
{
match error
{
TryRecvError::Empty =>
{
// No messages to handle.
check_messages = false;
debug!("There were no shutdown messages.");
}
TryRecvError::Disconnected =>
{
// We lost our connection to the
// shutdown channel.
warn!("{}", error);
}
}
}
}
}
}
// This thread is finished.
println!("Shutting down thread.");
change_thread_state!(ThreadState::Idle, current_state,
state_sender);
new_thread.process();
}
);
@ -402,7 +299,17 @@ impl Drop for Scheduler
for thread_data in self.compute_threads.iter_mut()
{
// Stop the thread.
thread_data.shutdown_sender.send(true);
match thread_data.shutdown_sender.send(true)
{
Ok(_) =>
{
}
Err(error) =>
{
warn!("{}", error);
}
}
}
// Wait about 5 seconds to make sure all
@ -411,104 +318,3 @@ impl Drop for Scheduler
::std::thread::sleep(Duration::new(5, 0));
}
}
///
fn process_compute_thread(current_task: &mut Option<Task>,
current_state: &mut ThreadState,
task_queue: &mut Arc<Mutex<BinaryHeap<Task>>>,
state_sender: &Sender<ThreadState>)
{
let mut task_completed: bool;
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match *current_task
{
Some(ref mut task) =>
{
// Process the task this thread is
// currently working on.
task.process();
match task.state
{
TaskState::Finished =>
{
task_completed = true;
}
_ =>
{
}
}
}
None =>
{
// This thread does not have a current task.
// Get another task to work on from the Queue.
match task_queue.lock()
{
Ok(ref mut guard) =>
{
*current_task = guard.deref_mut().pop();
match *current_task
{
Some(_) =>
{
debug!("Received a task.");
change_thread_state!(ThreadState::Processing,
*current_state, state_sender);
}
None =>
{
debug!("No new task to process.");
}
}
}
Err(error) =>
{
error!("{}", error);
}
}
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
if current_task.is_none()
{
match *current_state
{
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
_ =>
{
// There is nothing for this thread
// to process, so mark the thread as idle.
change_thread_state!(ThreadState::Idle, *current_state,
state_sender);
}
}
}
}
}
if task_completed == true
{
println!("Task completed.");
*current_task = None;
change_thread_state!(ThreadState::Idle, *current_state,
state_sender);
}
}

View File

@ -1,18 +1,13 @@
use std::collections::binary_heap::BinaryHeap;
use std::ops::{Deref, DerefMut};
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::JoinHandle;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use std::vec::Vec;
use ::task::Task;
use ::task_state::TaskState;
use ::thread_data::ThreadData;
use ::thread_state::ThreadState;
use chrono::MonotonicTime;
///
@ -31,7 +26,10 @@ pub struct Thread
shutdown_receiver: Receiver<bool>,
///
state_sender: Sender<ThreadState>
state_sender: Sender<ThreadState>,
///
continue_running: bool,
}
@ -50,108 +48,105 @@ impl Thread
current_task: None,
task_queue: queue,
shutdown_receiver: shutdown_r,
state_sender: state_s
state_sender: state_s,
continue_running: true
}
}
///
pub fn process(&mut self)
{
let mut check_messages: bool;
let mut task_completed: bool;
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match self.current_task
// Run this thread until the scheduler decides
// to shut it down.
self.change_state(ThreadState::Idle);
while self.continue_running == true
{
Some(ref mut task) =>
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match self.current_task
{
// Process the task this thread is
// currently working on.
task.process();
match task.state
Some(ref mut task) =>
{
TaskState::Finished =>
{
task_completed = true;
}
// Process the task this thread is
// currently working on.
task.process();
_ =>
match task.state
{
}
}
}
None =>
{
// This thread does not have a current task.
// Get another task to work on from the Queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
self.current_task = guard.deref_mut().pop();
match self.current_task
TaskState::Finished =>
{
Some(_) =>
{
debug!("Received a task.");
}
task_completed = true;
}
None =>
{
debug!("No new task to process.");
}
_ =>
{
}
}
Err(error) =>
{
error!("{}", error);
}
}
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
match self.current_task
None =>
{
Some(_) =>
// Try to get a task to work on.
self.retrieve_task();
match self.current_task
{
self.change_state(ThreadState::Processing);
}
None =>
{
match self.state
Some(_) =>
{
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
// We have a task to work on, so switch to
// a Processing state.
self.change_state(ThreadState::Processing);
}
_ =>
None =>
{
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
match self.state
{
// There is nothing for this thread
// to process, so mark the thread as idle.
self.change_state(ThreadState::Idle);
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
_ =>
{
// There is nothing for this thread
// to process, so mark the thread as idle.
self.change_state(ThreadState::Idle);
}
}
}
}
}
}
// Check to see if the task this thread
// was processing was completed.
if task_completed == true
{
println!("Task completed.");
self.current_task = None;
self.change_state(ThreadState::Idle);
}
// Sleep the thread so that other threads
// get a chance to run
::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
self.process_shutdown_messages();
}
if task_completed == true
{
println!("Task completed.");
self.current_task = None;
self.change_state(ThreadState::Idle);
}
// This thread is finished.
println!("Shutting down thread.");
self.change_state(ThreadState::Finished);
}
///
@ -173,6 +168,45 @@ impl Thread
}
}
///
fn retrieve_task(&mut self)
{
// There is nothing to do if this thread already
// has a task to work on.
if self.current_task.is_some() == true
{
return;
}
// This thread does not have a current task.
// Get another task to work on from the Queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
self.current_task = guard.deref_mut().pop();
match self.current_task
{
Some(_) =>
{
debug!("Received a task.");
}
None =>
{
debug!("No new task to process.");
}
}
}
Err(error) =>
{
error!("{}", error);
}
}
}
/// Queues a task to be processed.
fn queue_task(&mut self, task: Task)
{
@ -190,4 +224,47 @@ impl Thread
}
}
}
/// Check for any new thread shutdown messages.
fn process_shutdown_messages(&mut self)
{
let mut check_messages: bool;
// Loop through all the messages in the
// receivers queue.
check_messages = true;
while check_messages == true
{
match self.shutdown_receiver.try_recv()
{
Ok(val) =>
{
// Found a message.
self.continue_running = !val;
}
Err(error) =>
{
// Determine the kind of error we received.
match error
{
TryRecvError::Empty =>
{
// No messages to handle.
check_messages = false;
debug!("There were no shutdown messages.");
}
TryRecvError::Disconnected =>
{
// We lost our connection to the
// shutdown channel.
// TODO: Handle this poisoning correctly.
warn!("{}", error);
}
}
}
}
}
}
}