Checking this in before I pull the thread code out to its own file.

This commit is contained in:
Jason Travis Smith 2016-03-25 02:45:34 -04:00
parent b83c3970a6
commit 01f9b49498
13 changed files with 862 additions and 31 deletions

14
Cargo.lock generated
View File

@ -2,11 +2,23 @@
name = "apprentice" name = "apprentice"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono 0.9.0 (git+https://gitlab.com/CyberMages/chrono.git)",
"scribe 0.1.0 (git+https://gitlab.com/CyberMages/scribe.git)", "scribe 0.1.0 (git+https://gitlab.com/CyberMages/scribe.git)",
"spellbook 0.1.0 (git+https://gitlab.com/CyberMages/spellbook.git)",
] ]
[[package]]
name = "chrono"
version = "0.9.0"
source = "git+https://gitlab.com/CyberMages/chrono.git#10f1633ac1334cb6ada86222c68efeea89141be7"
[[package]] [[package]]
name = "scribe" name = "scribe"
version = "0.1.0" version = "0.1.0"
source = "git+https://gitlab.com/CyberMages/scribe.git#d6a6f5107c8d03b13e081c9378486781e31daa4e" source = "git+https://gitlab.com/CyberMages/scribe.git#e52418d3bfc28cd1f03cc7f31af06fce2e03f844"
[[package]]
name = "spellbook"
version = "0.1.0"
source = "git+https://gitlab.com/CyberMages/spellbook.git#f8526d248f3b7eb4f15f672368ac45a64c4eacbd"

View File

@ -9,5 +9,11 @@ documentation = ""
keywords = ["scheduler", "task", "thread"] keywords = ["scheduler", "task", "thread"]
[dependencies.chrono]
git = "https://gitlab.com/CyberMages/chrono.git"
[dependencies.scribe] [dependencies.scribe]
git = "https://gitlab.com/CyberMages/scribe.git" git = "https://gitlab.com/CyberMages/scribe.git"
[dependencies.spellbook]
git = "https://gitlab.com/CyberMages/spellbook.git"

1
README.md Normal file
View File

@ -0,0 +1 @@
# Apprentice #

View File

@ -8,5 +8,11 @@ use apprentice::*;
pub fn main() pub fn main()
{ {
println!("Hello world"); let mut scheduler: Scheduler;
scheduler = Scheduler::new(None, Some(4));
scheduler.queue_task(Task::new("Test"));
scheduler.process_tasks(true);
scheduler.process_tasks(true);
//scheduler.process_tasks(true);
} }

View File

@ -1,14 +1,22 @@
#[macro_use] #[macro_use]
extern crate scribe; extern crate scribe;
#[macro_use]
extern crate spellbook;
extern crate chrono;
mod scheduler; mod scheduler;
mod task; mod task;
mod task_state;
mod thread; mod thread;
mod thread_data;
mod thread_state;
pub use self::scheduler::Scheduler; pub use self::scheduler::Scheduler;
pub use self::task::Task; pub use self::task::Task;
pub use self::thread::Thread; pub use self::task_state::TaskState;

View File

@ -1,18 +1,70 @@
use std::collections::binary_heap::BinaryHeap;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::JoinHandle;
use std::time::Duration;
use std::vec::Vec;
use ::task::Task;
use ::task_state::TaskState;
use ::thread::Thread; use ::thread::Thread;
use ::thread_data::ThreadData;
use ::thread_state::ThreadState;
use chrono::MonotonicTime;
/// ///
pub struct Scheduler pub struct Scheduler
{ {
/// /// The minimum amount of compute threads that the
has_unlimited_threads: bool, /// scheduler will keep alive.
minimum_thread_amount: u64,
/// /// The maximum amount of compute threads that the
/// scheduler will have alive at one given time.
maximum_thread_amount: u64, maximum_thread_amount: u64,
/// /// Specify the maximum amount of time that a thread
thread_list: Vec<Thread> /// can stay idle before it is marked to be killed.
maximum_thread_idle_time: Duration,
/// The threads created to handle
/// non-blocking computation tasks.
compute_threads: Vec<ThreadData>,
/// The threads created to handle blocking tasks.
blocking_threads: Vec<ThreadData>,
/// The main task queue that threads can use to process tasks.
task_queue: Arc<Mutex<BinaryHeap<Task>>>
}
macro_rules! change_thread_state
{
($state: expr, $state_holder: expr, $sender: expr) =>
{
{
$state_holder = $state;
match $sender.send($state_holder)
{
Ok(_) =>
{
}
Err(error) =>
{
// We lost our connection to the
// state channel.
warn!("{}", error);
}
}
}
}
} }
@ -20,41 +72,443 @@ pub struct Scheduler
impl Scheduler impl Scheduler
{ {
/// ///
pub fn new(num_threads: Option<u64>) -> Scheduler pub fn new(min_threads: Option<u64>, max_threads: Option<u64>) -> Scheduler
{ {
let thread_count: u64; let min_count: u64;
let unlimited_threads: bool; let max_count: u64;
// Check to see if the scheduler will // See if there is a minimum number of threads that
// be using unlimited threads. // the scheduler is supposed to keep around.
match num_threads match min_threads
{
Some(count) =>
{
min_count = count;
}
None =>
{
min_count = 0;
}
}
// Get the maximum amount of threads that the
// scheduler can create. This is for compute threads
// since I/O threads will block.
match max_threads
{ {
Some(count) => Some(count) =>
{ {
// Set the maximum number of threads to be the desired // Set the maximum number of threads to be the desired
// amount. // amount.
thread_count = count; max_count = count;
unlimited_threads = false;
} }
None => None =>
{ {
// Set that there can be an unlimited amount of threads. // We don't know how many threads we can create.
thread_count = 0; // Do the ideal thing and create 1 thread per core.
unlimited_threads = true; max_count = 1;
} }
} }
// Create the new Scheduler. debug!("Creating Scheduler with {} max threads.", max_count);
Scheduler Scheduler
{ {
has_unlimited_threads: unlimited_threads, minimum_thread_amount: min_count,
maximum_thread_amount: thread_count, maximum_thread_amount: max_count,
thread_list: Vec::new() maximum_thread_idle_time: Duration::new(5, 0),
compute_threads: Vec::with_capacity(max_count as usize),
blocking_threads: Vec::with_capacity(max_count as usize),
task_queue: Arc::new(Mutex::new(BinaryHeap::new()))
} }
} }
fn create_thread() /// Queues a task to be processed.
pub fn queue_task(&mut self, task: Task)
{
// Just add the task to the queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
guard.deref_mut().push(task);
}
Err(error) =>
{
error!("{}", error);
}
}
}
/// Processing the currently queued tasks.
///
/// If this is not a single iteration, then it will block
/// the current thread until all tasks are finished processed.
/// This does not count immortal tasks.
pub fn process_tasks(&mut self, is_single_iteration: bool)
{
let mut continue_processing: bool;
let mut new_thread_count: u64;
// Continue to process the tasks as long as possible.
continue_processing = true;
while continue_processing == true
{
debug!("There are currently {} tasks to process.",
self.get_task_count());
debug!("Currently running {} out of {} threads.",
self.compute_threads.len(), self.maximum_thread_amount);
// Determine how many threads need to be created.
new_thread_count = self.determine_new_thread_amount();
// Create the threads that we determined that we needed
// to process the current tasks.
self.spawn_compute_threads(new_thread_count);
// Check to see if this was to be a single iteration
// of the task scheduler.
if is_single_iteration == true
{
// This was only a single iteration,
// so stop the processing.
continue_processing = false;
}
// else
// {
// Let the thread sleep before
// handling the next iteration.
::std::thread::sleep(Duration::new(1, 0));
// }
}
}
/// Get the amount of tasks in the queue.
fn get_task_count(&mut self) -> u64
{
// Get the size of the task queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
return guard.deref_mut().len() as u64;
}
Err(error) =>
{
error!("{}", error);
}
}
0u64
}
/// This will calculate the new amount of compute threads
/// that need to be created.
fn determine_new_thread_amount(&mut self) -> u64
{
let task_count: u64;
let thread_count: u64;
let potential_threads: u64;
let mut desired_threads: u64;
let mut available_threads: u64;
let mut new_thread_count: u64;
// Determine the amount of tasks that need to be processed.
task_count = self.get_task_count();
debug!("There are currently {} compute tasks to process.", task_count);
// Determine how many threads we currently have available.
thread_count = self.compute_threads.len() as u64;
debug!("Currently running {} out of {} compute threads.",
thread_count, self.maximum_thread_amount);
// Determine how many of these threads currently are
// not processing anything.
available_threads = 0u64;
for thread_data in self.compute_threads.iter()
{
match thread_data.state
{
ThreadState::Idle =>
{
available_threads += 1u64;
}
ThreadState::Starting =>
{
available_threads += 1u64;
}
_ =>
{ {
} }
}
}
// Calculate how many threads we would need to
// process all the tasks in our queue if each
// task was given its own thread.
println!("task_count: {}\navailable_threads: {}", task_count, available_threads);
desired_threads = 0u64;
if task_count > available_threads
{
desired_threads = task_count - available_threads;
}
new_thread_count = desired_threads;
// Calculate how many potential threads can be created.
potential_threads = self.maximum_thread_amount - thread_count;
// We are limited by a hard ceiling of runnable threads.
// Make sure we have as many as we can to process
// the currently queued tasks.
if potential_threads < desired_threads
{
// Make sure we have atleast the maximum number
// of threads aloud.
new_thread_count = self.maximum_thread_amount - thread_count;
}
// Return the determined amount of threads to create.
new_thread_count
}
///
fn spawn_compute_threads(&mut self, amount: u64)
{
let mut shutdown_channel: (Sender<bool>, Receiver<bool>);
let mut state_channel: (Sender<ThreadState>, Receiver<ThreadState>);
let mut shutdown_receiver: Receiver<bool>;
let mut state_sender: Sender<ThreadState>;
let mut join_handle: JoinHandle<()>;
let mut task_queue: Arc<Mutex<BinaryHeap<Task>>>;
let mut data: ThreadData;
// Loop through and create all the required threads.
debug!("Spawning {} compute threads", amount);
for _i in 0..amount
{
// Create the channel to shutdown the thread.
shutdown_channel = channel::<bool>();
shutdown_receiver = shutdown_channel.1;
// Create the channel to retrieve the
// status of the thread.
state_channel = channel::<ThreadState>();
state_sender = state_channel.0;
// Clone the task queue for the thread.
task_queue = self.task_queue.clone();
// Create a new Thread.
join_handle = ::std::thread::spawn(move ||
{
let mut check_messages: bool;
let mut continue_running: bool;
let mut current_state: ThreadState;
let mut current_task: Option<Task>;
println!("Starting thread.");
// Threads start off in the starting state.
current_state = ThreadState::Starting;
// This thread will start off with no
// tasks to process.
current_task = None;
// Run this thread until the scheduler decides
// to shut it down.
change_thread_state!(ThreadState::Idle, current_state,
state_sender);
continue_running = true;
while continue_running == true
{
println!("Running thread.");
process_compute_thread(&mut current_task,
&mut current_state,
&mut task_queue,
&state_sender);
// Sleep the thread so that other threads
// get a chance to run
::std::thread::sleep(Duration::new(0, 100));
// Check to see if this thread should be shutdown.
check_messages = true;
while check_messages == true
{
match shutdown_receiver.try_recv()
{
Ok(val) =>
{
continue_running = !val;
}
Err(error) =>
{
match error
{
TryRecvError::Empty =>
{
// No messages to handle.
check_messages = false;
debug!("There were no shutdown messages.");
}
TryRecvError::Disconnected =>
{
// We lost our connection to the
// shutdown channel.
warn!("{}", error);
}
}
}
}
}
}
// This thread is finished.
println!("Shutting down thread.");
change_thread_state!(ThreadState::Idle, current_state,
state_sender);
}
);
// Create a new set of data for the
// thread we want to create.
data = ThreadData::new(join_handle, shutdown_channel.0,
state_channel.1);
// Add the new ThreadData to the set
// of compute threads.
self.compute_threads.push(data);
}
}
}
impl Drop for Scheduler
{
fn drop(&mut self)
{
debug!("Destroying scheduler.");
// Stop any threads that are running.
debug!("Stopping threads.");
for thread_data in self.compute_threads.iter_mut()
{
// Stop the thread.
thread_data.shutdown_sender.send(true);
}
// Wait about 5 seconds to make sure all
// the threads have been stopped.
debug!("Giving threads a chance to end.");
::std::thread::sleep(Duration::new(5, 0));
}
}
///
fn process_compute_thread(current_task: &mut Option<Task>,
current_state: &mut ThreadState,
task_queue: &mut Arc<Mutex<BinaryHeap<Task>>>,
state_sender: &Sender<ThreadState>)
{
let mut task_completed: bool;
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match *current_task
{
Some(ref mut task) =>
{
// Process the task this thread is
// currently working on.
task.process();
match task.state
{
TaskState::Finished =>
{
task_completed = true;
}
_ =>
{
}
}
}
None =>
{
// This thread does not have a current task.
// Get another task to work on from the Queue.
match task_queue.lock()
{
Ok(ref mut guard) =>
{
*current_task = guard.deref_mut().pop();
match *current_task
{
Some(_) =>
{
debug!("Received a task.");
change_thread_state!(ThreadState::Processing,
*current_state, state_sender);
}
None =>
{
debug!("No new task to process.");
}
}
}
Err(error) =>
{
error!("{}", error);
}
}
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
if current_task.is_none()
{
match *current_state
{
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
_ =>
{
// There is nothing for this thread
// to process, so mark the thread as idle.
change_thread_state!(ThreadState::Idle, *current_state,
state_sender);
}
}
}
}
}
if task_completed == true
{
println!("Task completed.");
*current_task = None;
change_thread_state!(ThreadState::Idle, *current_state,
state_sender);
}
} }

9
src/status.rs Normal file
View File

@ -0,0 +1,9 @@
///
pub enum Status
{
///
Waiting,
///
Finished
}

View File

@ -1,8 +1,16 @@
use std::cmp::Ordering;
use ::task_state::TaskState;
/// ///
pub struct Task pub struct Task
{ {
/// ///
name: String pub name: String,
///
pub state: TaskState
} }
@ -14,7 +22,50 @@ impl Task
{ {
Task Task
{ {
name: String::from(task_name) name: String::from(task_name),
state: TaskState::Starting
} }
} }
///
pub fn set_processing_func(&self) -> i8
{
0
}
///
pub fn process(&mut self)
{
self.state = TaskState::Processing;
println!("Processing task.");
self.state = TaskState::Finished;
}
}
impl PartialEq for Task
{
fn eq(&self, other: &Self) -> bool
{
self.name.eq(&other.name)
}
}
impl Eq for Task
{
}
impl PartialOrd for Task
{
fn partial_cmp(&self, other: &Self) -> Option<Ordering>
{
Some(self.cmp(other))
}
}
impl Ord for Task
{
fn cmp(&self, other: &Self) -> Ordering
{
self.name.cmp(&other.name)
}
} }

40
src/task_state.rs Normal file
View File

@ -0,0 +1,40 @@
/// The different states a Task can go through during its lifetime.
pub enum TaskState
{
/// The state that every Task starts in.
///
/// The STARTING state can only lead to the processing state.
Starting,
/// The state that each Task is in while it is actually
/// being process and running on a thread.
///
/// The PROCESSING state can only lead to WAITING,
/// FINISHED, or UNKNOWN states.
Processing,
/// The state a Task is in while it is waiting for child
/// tasks to switch to the FINISHED state.
///
/// The WAITING state can only lead to the DONE_WAITING
/// and UNKNOWN states.
Waiting,
/// The state a Task is in once it is done processing
/// child tasks, but prior to going back to the PROCESSING state.
///
/// The DONE_WAITING state can only lead to the PROCESSING
/// and UNKNOWN states.
DoneWaiting,
/// The state a Task will be in when it is FINISHED processing
/// and ready to be destroyed.
///
/// The FINISHED state cannot lead to any states and shows that the
/// task is completed.
Finished,
/// The state a Task will be placed in if it is detected to be
/// in an inproper state during its lifetime.
Unknown
}

10
src/task_type.rs Normal file
View File

@ -0,0 +1,10 @@
///
pub enum TaskType
{
/// A task ment to get its own thread so that
/// it can run as long as it needs to.
Immortal,
///
Mortal
}

View File

@ -1,8 +1,37 @@
use std::collections::binary_heap::BinaryHeap;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread::JoinHandle;
use std::time::Duration;
use std::vec::Vec;
use ::task::Task;
use ::task_state::TaskState;
use ::thread_data::ThreadData;
use ::thread_state::ThreadState;
use chrono::MonotonicTime;
/// ///
pub struct Thread pub struct Thread
{ {
/// ///
empty: i8 state: ThreadState,
///
current_task: Option<Task>,
///
task_queue: Arc<Mutex<BinaryHeap<Task>>>,
///
shutdown_receiver: Receiver<bool>,
///
state_sender: Sender<ThreadState>
} }
@ -10,11 +39,155 @@ pub struct Thread
impl Thread impl Thread
{ {
/// ///
pub fn new() -> Thread pub fn new(queue: Arc<Mutex<BinaryHeap<Task>>>,
shutdown_r: Receiver<bool>,
state_s: Sender<ThreadState>)
-> Thread
{ {
Thread Thread
{ {
empty: 0 state: ThreadState::Starting,
current_task: None,
task_queue: queue,
shutdown_receiver: shutdown_r,
state_sender: state_s
}
}
///
pub fn process(&mut self)
{
let mut task_completed: bool;
// No task was recently completed.
task_completed = false;
// Make sure that this thread has a Task
// to currently work on.
match self.current_task
{
Some(ref mut task) =>
{
// Process the task this thread is
// currently working on.
task.process();
match task.state
{
TaskState::Finished =>
{
task_completed = true;
}
_ =>
{
}
}
}
None =>
{
// This thread does not have a current task.
// Get another task to work on from the Queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
self.current_task = guard.deref_mut().pop();
match self.current_task
{
Some(_) =>
{
debug!("Received a task.");
}
None =>
{
debug!("No new task to process.");
}
}
}
Err(error) =>
{
error!("{}", error);
}
}
// If we don't have a task to process,
// then we may need to switch over to
// an idle state.
match self.current_task
{
Some(_) =>
{
self.change_state(ThreadState::Processing);
}
None =>
{
match self.state
{
ThreadState::Idle =>
{
// The thread is already sitting idle.
}
_ =>
{
// There is nothing for this thread
// to process, so mark the thread as idle.
self.change_state(ThreadState::Idle);
}
}
}
}
}
}
if task_completed == true
{
println!("Task completed.");
self.current_task = None;
self.change_state(ThreadState::Idle);
}
}
///
fn change_state(&mut self, new_state: ThreadState)
{
self.state = new_state;
match self.state_sender.send(self.state)
{
Ok(_) =>
{
}
Err(error) =>
{
// We lost our connection to the
// state channel.
warn!("{}", error);
}
}
}
/// Queues a task to be processed.
fn queue_task(&mut self, task: Task)
{
// Just add the task to the queue.
match self.task_queue.lock()
{
Ok(ref mut guard) =>
{
guard.deref_mut().push(task);
}
Err(error) =>
{
error!("{}", error);
}
} }
} }
} }

45
src/thread_data.rs Normal file
View File

@ -0,0 +1,45 @@
use std::sync::mpsc::{Sender, Receiver};
use std::thread::JoinHandle;
use ::thread_state::ThreadState;
use chrono::MonotonicTime;
///
pub struct ThreadData
{
///
pub state: ThreadState,
///
pub handle: JoinHandle<()>,
///
pub idle_time: MonotonicTime,
///
pub shutdown_sender: Sender<bool>,
///
pub state_receiver: Receiver<ThreadState>,
}
impl ThreadData
{
pub fn new(join_handle: JoinHandle<()>, shutdown_s: Sender<bool>,
state_r: Receiver<ThreadState>) -> ThreadData
{
ThreadData
{
state: ThreadState::Starting,
handle: join_handle,
idle_time: MonotonicTime::current(),
shutdown_sender: shutdown_s,
state_receiver: state_r
}
}
}

16
src/thread_state.rs Normal file
View File

@ -0,0 +1,16 @@
///
#[derive(Clone, Copy)]
pub enum ThreadState
{
///
Starting,
///
Idle,
///
Processing,
///
Finished
}