Tasks can now be added to computer or blocking threads.

The tasks are now objects that implement Taskable.
This commit is contained in:
Jason Travis Smith 2016-04-01 15:12:44 -04:00
parent 82486b1634
commit 28ab99e867
9 changed files with 637 additions and 339 deletions

162
src/blocking_thread.rs Normal file
View File

@ -0,0 +1,162 @@
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use ::task::Task;
use ::task_state::TaskState;
use ::thread::Thread;
use ::thread_state::ThreadState;
///
pub struct BlockingThread
{
///
state: ThreadState,
///
current_task: Box<Task>,
///
shutdown_receiver: Receiver<bool>,
///
state_sender: Sender<ThreadState>,
///
continue_running: bool
}
impl BlockingThread
{
///
pub fn new(task: Box<Task>,
shutdown_r: Receiver<bool>,
state_s: Sender<ThreadState>)
-> BlockingThread
{
BlockingThread
{
state: ThreadState::Starting,
current_task: task,
shutdown_receiver: shutdown_r,
state_sender: state_s,
continue_running: true
}
}
///
fn change_state(&mut self, new_state: ThreadState)
{
self.state = new_state;
match self.state_sender.send(self.state)
{
Ok(_) =>
{
}
Err(error) =>
{
// We lost our connection to the
// state channel.
warn!("{}", error);
}
}
}
/// Queues a task to be processed. For a
/// blocking thread this needs to queue a
/// new blocking thread task.
fn queue_task(&mut self, task: Box<Task>)
{
}
/// Check for any new thread shutdown messages.
fn process_shutdown_messages(&mut self)
{
let mut check_messages: bool;
// Loop through all the messages in the
// receivers queue.
check_messages = true;
while check_messages == true
{
match self.shutdown_receiver.try_recv()
{
Ok(val) =>
{
// Found a message.
self.continue_running = !val;
}
Err(error) =>
{
// Determine the kind of error we received.
match error
{
TryRecvError::Empty =>
{
// No messages to handle.
check_messages = false;
debug!("There were no shutdown messages.");
}
TryRecvError::Disconnected =>
{
// We lost our connection to the
// shutdown channel.
// TODO: Handle this poisoning correctly.
warn!("{}", error);
}
}
}
}
}
}
}
impl Thread for BlockingThread
{
///
fn process(&mut self)
{
// Run this thread until the scheduler decides
// to shut it down.
self.change_state(ThreadState::Idle);
self.change_state(ThreadState::Processing);
while self.continue_running == true
{
// Process the task this thread is
// currently working on.
self.current_task.process();
match self.current_task.get_state()
{
TaskState::Finished =>
{
self.continue_running = false;
}
_ =>
{
}
}
// Sleep the thread so that other threads
// get a chance to run.
::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
if self.continue_running == true
{
self.process_shutdown_messages();
}
}
// This thread is finished.
println!("Shutting down thread.");
self.change_state(ThreadState::Finished);
}
}

274
src/compute_thread.rs Normal file
View File

@ -0,0 +1,274 @@
use std::collections::binary_heap::BinaryHeap;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use ::task::Task;
use ::task_state::TaskState;
use ::thread::Thread;
use ::thread_state::ThreadState;
///
pub struct ComputeThread
{
///
state: ThreadState,
///
current_task: Option<Box<Task>>,
///
task_queue: Arc<Mutex<BinaryHeap<Box<Task>>>>,
///
shutdown_receiver: Receiver<bool>,
///
state_sender: Sender<ThreadState>,
///
continue_running: bool,
}
impl ComputeThread
{
///
pub fn new(queue: Arc<Mutex<BinaryHeap<Box<Task>>>>,
shutdown_r: Receiver<bool>,
state_s: Sender<ThreadState>)
-> ComputeThread
{
ComputeThread
{
state: ThreadState::Starting,
current_task: None,
task_queue: queue,
shutdown_receiver: shutdown_r,
state_sender: state_s,
continue_running: true
}
}
///
fn change_state(&mut self, new_state: ThreadState)
{
self.state = new_state;
match self.state_sender.send(self.state)
{
Ok(_) =>
{
}
Err(error) =>
{
// We lost our connection to the
// state channel.
warn!("{}", error);
}
}
}
///
fn retrieve_task(&mut self)
{
// There is nothing to do if this thread already
// has a task to work on.
if self.current_task.is_some() == true
{
return;
}
// This thread does not have a current task.
// Get another task to work on from the Queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
self.current_task = guard.deref_mut().pop();
match self.current_task
{
Some(_) =>
{
debug!("Received a task.");
}
None =>
{
debug!("No new task to process.");
}
}
}
Err(error) =>
{
error!("{}", error);
}
}
}
/// Queues a task to be processed.
fn queue_task(&mut self, task: Box<Task>)
{
// Just add the task to the queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
guard.deref_mut().push(task);
}
Err(error) =>
{
error!("{}", error);
}
}
}
/// Check for any new thread shutdown messages.
fn process_shutdown_messages(&mut self)
{
let mut check_messages: bool;
// Loop through all the messages in the
// receivers queue.
check_messages = true;
while check_messages == true
{
match self.shutdown_receiver.try_recv()
{
Ok(val) =>
{
// Found a message.
self.continue_running = !val;
}
Err(error) =>
{
// Determine the kind of error we received.
match error
{
TryRecvError::Empty =>
{
// No messages to handle.
check_messages = false;
debug!("There were no shutdown messages.");
}
TryRecvError::Disconnected =>
{
// We lost our connection to the
// shutdown channel.
// TODO: Handle this poisoning correctly.
warn!("{}", error);
}
}
}
}
}
}
}
impl Thread for ComputeThread
{
///
fn process(&mut self)
{
let mut check_messages: bool;
let mut task_completed: bool;
// Run this thread until the scheduler decides
// to shut it down.
self.change_state(ThreadState::Idle);
while self.continue_running == true
{
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match self.current_task
{
Some(ref mut task) =>
{
// Process the task this thread is
// currently working on.
task.process();
match task.get_state()
{
TaskState::Finished =>
{
task_completed = true;
}
_ =>
{
}
}
}
None =>
{
// Try to get a task to work on.
self.retrieve_task();
match self.current_task
{
Some(_) =>
{
// We have a task to work on, so switch to
// a Processing state.
self.change_state(ThreadState::Processing);
}
None =>
{
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
match self.state
{
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
_ =>
{
// There is nothing for this thread
// to process, so mark the thread as idle.
self.change_state(ThreadState::Idle);
}
}
}
}
}
}
// Check to see if the task this thread
// was processing was completed.
if task_completed == true
{
println!("Task completed.");
self.current_task = None;
self.change_state(ThreadState::Idle);
}
// Sleep the thread so that other threads
// get a chance to run
::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
self.process_shutdown_messages();
}
// This thread is finished.
println!("Shutting down thread.");
self.change_state(ThreadState::Finished);
}
}

View File

@ -10,7 +10,11 @@ extern crate chrono;
mod scheduler; mod scheduler;
mod task; mod task;
mod taskable;
mod task_container;
mod task_state; mod task_state;
mod blocking_thread;
mod compute_thread;
mod thread; mod thread;
mod thread_data; mod thread_data;
mod thread_state; mod thread_state;
@ -18,5 +22,4 @@ mod thread_state;
pub use self::scheduler::Scheduler; pub use self::scheduler::Scheduler;
pub use self::task::Task; pub use self::taskable::Taskable;
pub use self::task_state::TaskState;

View File

@ -6,7 +6,13 @@ use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use std::vec::Vec; use std::vec::Vec;
use chrono::MonotonicTime;
use ::task::Task; use ::task::Task;
use ::taskable::Taskable;
use ::task_container::TaskContainer;
use ::blocking_thread::BlockingThread;
use ::compute_thread::ComputeThread;
use ::thread::Thread; use ::thread::Thread;
use ::thread_data::ThreadData; use ::thread_data::ThreadData;
use ::thread_state::ThreadState; use ::thread_state::ThreadState;
@ -36,7 +42,7 @@ pub struct Scheduler
blocking_threads: Vec<ThreadData>, blocking_threads: Vec<ThreadData>,
/// The main task queue that threads can use to process tasks. /// The main task queue that threads can use to process tasks.
task_queue: Arc<Mutex<BinaryHeap<Task>>> task_queue: Arc<Mutex<BinaryHeap<Box<Task>>>>
} }
@ -86,6 +92,7 @@ impl Scheduler
debug!("Creating Scheduler with {} max threads.", max_count); debug!("Creating Scheduler with {} max threads.", max_count);
// Create the new scheduler.
Scheduler Scheduler
{ {
minimum_thread_amount: min_count, minimum_thread_amount: min_count,
@ -98,14 +105,19 @@ impl Scheduler
} }
/// Queues a task to be processed. /// Queues a task to be processed.
pub fn queue_task(&mut self, task: Task) pub fn queue_task<T>(&mut self, task: T) where T: Taskable + 'static
{ {
let container: TaskContainer<T>;
// Create the container to hold the task.
container = TaskContainer::new(task);
// Just add the task to the queue. // Just add the task to the queue.
match self.task_queue.lock() match self.task_queue.lock()
{ {
Ok(ref mut guard) => Ok(ref mut guard) =>
{ {
guard.deref_mut().push(task); guard.deref_mut().push(Box::new(container));
} }
Err(error) => Err(error) =>
@ -115,6 +127,25 @@ impl Scheduler
} }
} }
/// Queues a task, that is known to block, to be processed.
pub fn queue_blocking_task<T>(&mut self, task: T) where T: Taskable+ 'static
{
let data: ThreadData;
let container: TaskContainer<T>;
// Create the container to hold the task.
container = TaskContainer::new(task);
// Spawn a new blocking thread
// and process this task on it.
debug!("Spawning blocking thread.");
// Add the new ThreadData to the set
// of compute threads.
data = self.spawn_blocking_thread(Box::new(container));
self.blocking_threads.push(data);
}
/// Processing the currently queued tasks. /// Processing the currently queued tasks.
/// ///
/// If this is not a single iteration, then it will block /// If this is not a single iteration, then it will block
@ -141,6 +172,10 @@ impl Scheduler
// to process the current tasks. // to process the current tasks.
self.spawn_compute_threads(new_thread_count); self.spawn_compute_threads(new_thread_count);
// Check to see if there are any compute threads
// that need to be destroyed.
self.trim_compute_threads();
// Check to see if this was to be a single iteration // Check to see if this was to be a single iteration
// of the task scheduler. // of the task scheduler.
if is_single_iteration == true if is_single_iteration == true
@ -149,12 +184,10 @@ impl Scheduler
// so stop the processing. // so stop the processing.
continue_processing = false; continue_processing = false;
} }
// else
// { // Let the thread sleep before
// Let the thread sleep before // handling the next iteration.
// handling the next iteration. ::std::thread::sleep(Duration::new(0, 100));
::std::thread::sleep(Duration::new(0, 100));
// }
} }
} }
@ -247,45 +280,83 @@ impl Scheduler
new_thread_count new_thread_count
} }
///
fn trim_compute_threads(&mut self)
{
}
/// ///
fn spawn_compute_threads(&mut self, amount: u64) fn spawn_compute_threads(&mut self, amount: u64)
{ {
let mut shutdown_channel: (Sender<bool>, Receiver<bool>);
let mut state_channel: (Sender<ThreadState>, Receiver<ThreadState>);
let mut join_handle: JoinHandle<()>;
let mut data: ThreadData; let mut data: ThreadData;
let mut new_thread: Thread;
// Loop through and create all the required threads. // Loop through and create all the required threads.
debug!("Spawning {} compute threads", amount); debug!("Spawning {} compute threads", amount);
for _i in 0..amount for _i in 0..amount
{ {
// Create the channel to shutdown the thread.
shutdown_channel = channel::<bool>();
// Create the channel to retrieve the
// status of the thread.
state_channel = channel::<ThreadState>();
// Create a new Thread.
new_thread = Thread::new(self.task_queue.clone(),
shutdown_channel.1, state_channel.0);
join_handle = ::std::thread::spawn(move ||
{
new_thread.process();
}
);
// Create a new set of data for the
// thread we want to create.
data = ThreadData::new(join_handle, shutdown_channel.0,
state_channel.1);
// Add the new ThreadData to the set // Add the new ThreadData to the set
// of compute threads. // of compute threads.
data = self.spawn_compute_thread();
self.compute_threads.push(data); self.compute_threads.push(data);
} }
} }
fn spawn_compute_thread(&self) -> ThreadData
{
let shutdown_channel: (Sender<bool>, Receiver<bool>);
let state_channel: (Sender<ThreadState>, Receiver<ThreadState>);
let join_handle: JoinHandle<()>;
let mut new_thread: ComputeThread;
// Create the channel to shutdown the thread.
shutdown_channel = channel::<bool>();
// Create the channel to retrieve the
// status of the thread.
state_channel = channel::<ThreadState>();
// Create a new Thread.
new_thread = ComputeThread::new(self.task_queue.clone(),
shutdown_channel.1, state_channel.0);
join_handle = ::std::thread::spawn(move ||
{
new_thread.process();
}
);
// Create a new set of data for the
// thread we want to create.
ThreadData::new(join_handle, shutdown_channel.0, state_channel.1)
}
fn spawn_blocking_thread(&self, task: Box<Task>) -> ThreadData
{
let shutdown_channel: (Sender<bool>, Receiver<bool>);
let state_channel: (Sender<ThreadState>, Receiver<ThreadState>);
let join_handle: JoinHandle<()>;
let mut new_thread: BlockingThread;
// Create the channel to shutdown the thread.
shutdown_channel = channel::<bool>();
// Create the channel to retrieve the
// status of the thread.
state_channel = channel::<ThreadState>();
// Create a new Thread.
new_thread = BlockingThread::new(task, shutdown_channel.1,
state_channel.0);
join_handle = ::std::thread::spawn(move ||
{
new_thread.process();
}
);
// Create a new set of data for the
// thread we want to create.
ThreadData::new(join_handle, shutdown_channel.0, state_channel.1)
}
} }
impl Drop for Scheduler impl Drop for Scheduler
@ -295,7 +366,24 @@ impl Drop for Scheduler
debug!("Destroying scheduler."); debug!("Destroying scheduler.");
// Stop any threads that are running. // Stop any threads that are running.
debug!("Stopping threads."); debug!("Stopping blocking threads.");
for thread_data in self.blocking_threads.iter_mut()
{
// Stop the thread.
match thread_data.shutdown_sender.send(true)
{
Ok(_) =>
{
}
Err(error) =>
{
warn!("{}", error);
}
}
}
debug!("Stopping compute threads.");
for thread_data in self.compute_threads.iter_mut() for thread_data in self.compute_threads.iter_mut()
{ {
// Stop the thread. // Stop the thread.

View File

@ -3,50 +3,22 @@ use std::cmp::Ordering;
use ::task_state::TaskState; use ::task_state::TaskState;
///
pub struct Task
{
///
pub name: String,
/// pub trait Task: Send + Sync
pub state: TaskState {
fn get_name(&self) -> &str;
fn get_state(&self) -> TaskState;
fn process(&mut self);
} }
impl Task
{
///
pub fn new(task_name: &str) -> Task
{
Task
{
name: String::from(task_name),
state: TaskState::Starting
}
}
///
pub fn set_processing_func(&self) -> i8
{
0
}
///
pub fn process(&mut self)
{
self.state = TaskState::Processing;
println!("Processing task.");
self.state = TaskState::Finished;
}
}
impl PartialEq for Task impl PartialEq for Task
{ {
fn eq(&self, other: &Self) -> bool fn eq(&self, other: &Self) -> bool
{ {
self.name.eq(&other.name) self.get_name().to_string().eq(&other.get_name().to_string())
} }
} }
@ -66,6 +38,6 @@ impl Ord for Task
{ {
fn cmp(&self, other: &Self) -> Ordering fn cmp(&self, other: &Self) -> Ordering
{ {
self.name.cmp(&other.name) self.get_name().to_string().cmp(&other.get_name().to_string())
} }
} }

53
src/task_container.rs Normal file
View File

@ -0,0 +1,53 @@
use ::task::Task;
use ::taskable::Taskable;
use ::task_state::TaskState;
///
pub struct TaskContainer<T>
{
///
pub state: TaskState,
///
pub task: T
}
impl<T> TaskContainer<T> where T: Taskable
{
///
pub fn new(task: T) -> TaskContainer<T>
{
TaskContainer
{
state: TaskState::Starting,
task: task
}
}
}
impl<T> Task for TaskContainer<T> where T: Taskable
{
///
fn get_name(&self) -> &str
{
self.task.get_name()
}
///
fn get_state(&self) -> TaskState
{
self.state
}
///
fn process(&mut self)
{
self.state = TaskState::Processing;
println!("Processing task.");
self.state = TaskState::Finished;
}
}

View File

@ -1,4 +1,5 @@
/// The different states a Task can go through during its lifetime. /// The different states a Task can go through during its lifetime.
#[derive(Clone, Copy)]
pub enum TaskState pub enum TaskState
{ {
/// The state that every Task starts in. /// The state that every Task starts in.

9
src/taskable.rs Normal file
View File

@ -0,0 +1,9 @@
///
pub trait Taskable: Send + Sync
{
///
fn get_name(&self) -> &str;
///
fn process(&mut self);
}

View File

@ -1,270 +1,6 @@
use std::collections::binary_heap::BinaryHeap;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use ::task::Task;
use ::task_state::TaskState;
use ::thread_state::ThreadState;
/// ///
pub struct Thread pub trait Thread: Send
{ {
/// ///
state: ThreadState, fn process(&mut self);
///
current_task: Option<Task>,
///
task_queue: Arc<Mutex<BinaryHeap<Task>>>,
///
shutdown_receiver: Receiver<bool>,
///
state_sender: Sender<ThreadState>,
///
continue_running: bool,
}
impl Thread
{
///
pub fn new(queue: Arc<Mutex<BinaryHeap<Task>>>,
shutdown_r: Receiver<bool>,
state_s: Sender<ThreadState>)
-> Thread
{
Thread
{
state: ThreadState::Starting,
current_task: None,
task_queue: queue,
shutdown_receiver: shutdown_r,
state_sender: state_s,
continue_running: true
}
}
///
pub fn process(&mut self)
{
let mut check_messages: bool;
let mut task_completed: bool;
// Run this thread until the scheduler decides
// to shut it down.
self.change_state(ThreadState::Idle);
while self.continue_running == true
{
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match self.current_task
{
Some(ref mut task) =>
{
// Process the task this thread is
// currently working on.
task.process();
match task.state
{
TaskState::Finished =>
{
task_completed = true;
}
_ =>
{
}
}
}
None =>
{
// Try to get a task to work on.
self.retrieve_task();
match self.current_task
{
Some(_) =>
{
// We have a task to work on, so switch to
// a Processing state.
self.change_state(ThreadState::Processing);
}
None =>
{
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
match self.state
{
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
_ =>
{
// There is nothing for this thread
// to process, so mark the thread as idle.
self.change_state(ThreadState::Idle);
}
}
}
}
}
}
// Check to see if the task this thread
// was processing was completed.
if task_completed == true
{
println!("Task completed.");
self.current_task = None;
self.change_state(ThreadState::Idle);
}
// Sleep the thread so that other threads
// get a chance to run
::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
self.process_shutdown_messages();
}
// This thread is finished.
println!("Shutting down thread.");
self.change_state(ThreadState::Finished);
}
///
fn change_state(&mut self, new_state: ThreadState)
{
self.state = new_state;
match self.state_sender.send(self.state)
{
Ok(_) =>
{
}
Err(error) =>
{
// We lost our connection to the
// state channel.
warn!("{}", error);
}
}
}
///
fn retrieve_task(&mut self)
{
// There is nothing to do if this thread already
// has a task to work on.
if self.current_task.is_some() == true
{
return;
}
// This thread does not have a current task.
// Get another task to work on from the Queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
self.current_task = guard.deref_mut().pop();
match self.current_task
{
Some(_) =>
{
debug!("Received a task.");
}
None =>
{
debug!("No new task to process.");
}
}
}
Err(error) =>
{
error!("{}", error);
}
}
}
/// Queues a task to be processed.
fn queue_task(&mut self, task: Task)
{
// Just add the task to the queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
guard.deref_mut().push(task);
}
Err(error) =>
{
error!("{}", error);
}
}
}
/// Check for any new thread shutdown messages.
fn process_shutdown_messages(&mut self)
{
let mut check_messages: bool;
// Loop through all the messages in the
// receivers queue.
check_messages = true;
while check_messages == true
{
match self.shutdown_receiver.try_recv()
{
Ok(val) =>
{
// Found a message.
self.continue_running = !val;
}
Err(error) =>
{
// Determine the kind of error we received.
match error
{
TryRecvError::Empty =>
{
// No messages to handle.
check_messages = false;
debug!("There were no shutdown messages.");
}
TryRecvError::Disconnected =>
{
// We lost our connection to the
// shutdown channel.
// TODO: Handle this poisoning correctly.
warn!("{}", error);
}
}
}
}
}
}
} }