diff --git a/src/ring_buffer.rs b/src/ring_buffer.rs index fc9c9bf..54e3a78 100644 --- a/src/ring_buffer.rs +++ b/src/ring_buffer.rs @@ -1,106 +1,360 @@ -use core::sync::atomic::{AtomicUsize, Ordering}; use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicUsize, Ordering}; +/// Errors that can occur when using the [`RingBuffer`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BufferError { + /// This will occur when you try to set the capacity of the [`RingBuffer`] to + /// zero. InvalidCapacity, + + /// This will occur when the [`RingBuffer`] is unable to + /// accomodate any new items, signaling back pressure. Full, - Empty } +/// This is here to make sure that the defined type is on it's own CacheLine. +/// Targets are x86_64 and a STM32H7. +/// +/// x86_64 has a cache line size of 64 bytes. +/// STM32H7 has a cache line size of 32 bytes. +/// +/// Adjust the repr alignment as needed. Normally this would use cfg based on +/// the target_arch or based on a value from Cargo.toml. +#[repr(align(64))] +struct CacheLinePadded { pub value: T } + + /// A heapless SPSC RingBuffer. /// +/// The implementation uses Atomics to track the amount of items in the memory +/// buffer. These atomics act as guards on the memory buffers access. Because +/// the atomics are going to be constantly incrementing and wrapping around as +/// the buffer is used their values are not in the buffers space and will need +/// to be constrained from counter space to buffer space with a modulo. +/// /// - T is the type to store in the buffer. /// - N is the size of the buffer. pub struct RingBuffer { - buffer: [UnsafeCell; N], - head: AtomicUsize, - tail: AtomicUsize + /// The memory to store into. + /// + /// The UnsafeCell is to allow interior mutability. + /// Access is controlled with the atomic accessors. + /// + /// The MaybeUninit is used here to allow for uninitialized memory. + /// This allows us to not require a Default implementation on T for + /// buffer initialization. + buffer: [UnsafeCell>; N], + + /// The head value. + /// + /// Like a queue this will only be added to until it wraps + /// around. This will increment from 0 to usize::MAX. It doesn't + /// represent the position in the buffer directly, it will need to be + /// reconciled into a buffer position by using modulo. + head: CacheLinePadded, + + /// The tail value. + /// + /// Like a queue this will only be added to until it wraps + /// around. This will increment from 0 to usize::MAX. It doesn't + /// represent the position in the buffer directly, it will need to be + /// reconciled into a buffer position by using modulo. + tail: CacheLinePadded, } -unsafe impl Send for RingBuffer {} - - -impl RingBuffer +impl RingBuffer { /// Create a new RingBuffer that can hold N items of type T. /// - /// # T - /// - Must implement Default. - /// /// # Capacity /// - Must be greater than 0. /// - Faster operations if capacity is a power of 2. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// assert_eq!(buffer.len(), 0); + /// assert_eq!(buffer.capacity(), 32); + /// ``` pub fn new() -> Result { if N == 0 { return Err(BufferError::InvalidCapacity) } Ok(RingBuffer { - buffer: core::array::from_fn(|_| UnsafeCell::new(T::default())), - head: AtomicUsize::new(0), - tail: AtomicUsize::new(0) + buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N], + head: CacheLinePadded:: { value: AtomicUsize::new(0) }, + tail: CacheLinePadded:: { value: AtomicUsize::new(0) } }) } - pub fn push(&self, value: T) -> Result<(), BufferError> + /// The amount of items that can be stored in the `RingBuffer`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// let cap = buffer.capacity(); + /// let len = buffer.len(); + /// + /// assert_ne!(cap, len); + /// assert_eq!(len, 0); + /// assert_eq!(cap, 32); + /// ``` + pub const fn capacity(&self) -> usize { N } + + /// The amount of items in the `RingBuffer`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(25).expect("Buffer is full."); + /// buffer.push_back(42).expect("Buffer is full."); + /// buffer.push_back(17).expect("Buffer is full."); + /// buffer.push_back(134).expect("Buffer is full."); + /// buffer.push_back(202).expect("Buffer is full."); + /// buffer.push_back(244).expect("Buffer is full."); + /// + /// assert_eq!(buffer.len(), 6); + /// assert_ne!(buffer.capacity(), buffer.len()); + /// ``` + pub fn len(&self) -> usize { - // Obtain tail and head. - let tail = self.tail.load(Ordering::Relaxed); - let head = self.head.load(Ordering::Acquire); + // Both producers and consumers will be calling this function. Grab the + // head and tail really quickly using Acquire so that the length is + // accurate. + let tail = self.tail.value.load(Ordering::Acquire); + let head = self.head.value.load(Ordering::Acquire); + + // Remember that the atomics are working in a counter space, so wrapping + // sub is required. + tail.wrapping_sub(head) + } + + /// Check if the `RingBuffer` is empty. + /// + /// # Returns + /// `true` if the `RingBuffer` is empty; otherwise it will return `false`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(25).expect("Buffer is full."); + /// let _ = buffer.pop_front(); + /// + /// assert!(buffer.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool + { + // The consumer will be calling this function. Acquire the tail position + // from the producer. + let head = self.head.value.load(Ordering::Relaxed); + let tail = self.tail.value.load(Ordering::Acquire); + + // Empty is when the tail and the head in counter space are the same. + head == tail + } + + /// Check if the `RingBuffer` is full. + /// + /// # Returns + /// `true` if the `RingBuffer` is full; otherwise it will return `false`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(25).expect("Buffer is full."); + /// buffer.push_back(15).expect("Buffer is full."); + /// buffer.push_back(42).expect("Buffer is full."); + /// buffer.push_back(1).expect("Buffer is full."); + /// buffer.push_back(2).expect("Buffer is full."); + /// + /// assert!(buffer.is_full()); + /// ``` + pub fn is_full(&self) -> bool + { + // The producer will be calling this function. Acquire the head position + // from the consumer. + let tail = self.tail.value.load(Ordering::Relaxed); + let head = self.head.value.load(Ordering::Acquire); + + // Full is marked when the distance between the head and tail in counter + // space is equal to our memory buffers capacity. + tail.wrapping_sub(head) == N + } + + /// Add an item to the buffer. + /// + /// # Returns + /// `Ok` if the operation was a success. + /// `Err` with BufferError::Full to signal back pressure. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(1).expect("Buffer is full."); + /// buffer.push_back(2).expect("Buffer is full."); + /// buffer.push_back(3).expect("Buffer is full."); + /// buffer.push_back(4).expect("Buffer is full."); + /// + /// assert_eq!(buffer.len(), 4); + /// ``` + pub fn push_back(&self, value: T) -> Result<(), BufferError> + { + // Obtain tail and head. Here the producer is pushing into the buffer, so + // we need to acquire the heads position from the consumer. The tail is + // just aquired because the producer is the one that should be changing + // it. + let tail = self.tail.value.load(Ordering::Relaxed); + let head = self.head.value.load(Ordering::Acquire); // Check if the buffer is Full. + // Full is marked when the distance between the head and tail in counter + // space is equal to our memory buffers capacity. if tail.wrapping_sub(head) == N { return Err(BufferError::Full); } // Add the item to the buffer. Here we are using the faster bit // masking if the capacity is a power of two. Otherwise we do the // division. let index = get_index(tail, N); + unsafe { (*self.buffer[index].get()).write(value); } - unsafe - { - *self.buffer[index].get() = value; - } - - // Increment and store the new tail position. - self.tail.store(tail.wrapping_add(1), Ordering::Release); + // Increment and store the new tail position, this will publish it so the + // consumer can read it. + self.tail.value.store(tail.wrapping_add(1), Ordering::Release); Ok(()) } - pub fn pop(&self) -> Result + /// Remove an Item from the `RingBuffer`. + /// + /// # Returns + /// `Some` with the item removed from the `RingBuffer`. + /// `None` when the [`RingBuffer`] has no items to return because it + /// is empty. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(1).expect("Buffer is full."); + /// buffer.push_back(2).expect("Buffer is full."); + /// buffer.push_back(3).expect("Buffer is full."); + /// buffer.push_back(4).expect("Buffer is full."); + /// + /// let item = buffer.pop_front().expect("The head item couldn't be removed."); + /// + /// assert_eq!(item, 1); + /// assert_eq!(buffer.len(), 3); + /// ``` + pub fn pop_front(&self) -> Option { - // Obtain head and tail. - let head = self.head.load(Ordering::Relaxed); - let tail = self.tail.load(Ordering::Acquire); + // Obtain head and tail. Here the consumer is removing from the buffer, + // so we need to acquire the tails position from the producer. The head + // is just aquired because the consumer is the one that should be + // changing it. + let head = self.head.value.load(Ordering::Relaxed); + let tail = self.tail.value.load(Ordering::Acquire); // Check if the buffer is empty. - if head == tail { return Err(BufferError::Empty); } + // Empty is when the tail and the head in counter space are the same. + if head == tail { return None; } // Get the item from the buffer. Here we are using the faster bit // masking if the capacity is a power of two. Otherwise we do the // division. let index = get_index(head, N); - let value = unsafe { *self.buffer[index].get() }; + let value = unsafe { (*self.buffer[index].get()).assume_init_read() }; - // Increment the head to the next item. - self.head.store(head.wrapping_add(1), Ordering::Release); + // Increment the head to the next item, this will publish it so the + // producer can read it. + self.head.value.store(head.wrapping_add(1), Ordering::Release); - Ok(value) + Some(value) } } -// Use the faster bit masking if the capacity is a power of two. -// Otherwise we do the division. This should get optimized away. +// Since we can store any type of T in the RingBuffer we need to make sure that +// items moved into it are released when the buffer is to be released. +// +// **Note** Make sure that the Drop of any stored items does not panic. +impl Drop for RingBuffer +{ + fn drop(&mut self) + { + // Get the head and tail positions. Noone else can be using the buffer + // here because we have sole ownership of it through the mutable reference + // of self. + let mut head = self.head.value.load(Ordering::Relaxed); + let tail = self.tail.value.load(Ordering::Relaxed); + + // Loop through and drop each item. + while head != tail + { + let index = get_index(head, N); + unsafe { (*self.buffer[index].get()).assume_init_drop() }; + + head = head.wrapping_add(1); + } + } +} + + +// The RingBuffer is using atomics to guard the access to the buffer. This +// allows us to state that the buffer is designed to be used from multiple +// threads from the same reference. The RingBuffer requires that its items all +// be Send so that they can traverse the threads pushing and pulling from it. +unsafe impl Sync for RingBuffer {} + + + +/// Map the counter space to the buffer space. +/// +/// Use the faster bit masking if the capacity is a power of two. +/// Otherwise we do the division. This should get optimized away. +/// I have not yet checked the output to make sure. +/// +/// TODO: Check that this is getting optimized right. +/// +/// # Arguments +/// `pos` This is the position in counter space. +/// `cap` The capacity, size, of the buffer space. #[inline(always)] fn get_index(pos: usize, cap: usize) -> usize {