From 7a83efe48178a2ebf3b6314084a8c4a0379c9b51 Mon Sep 17 00:00:00 2001 From: Myrddin Dundragon Date: Sun, 15 Feb 2026 13:23:58 -0500 Subject: [PATCH] Copy is no longer required for items stored. This is a big change. Initially items added to the RingBuffer needed to have a Default implementation. This nessecitated the use of Copy. However, by doing so the RingBuffer was not able to store non-Copy types like String or Vec. To allow for this change the buffer switched from using Default to initialize the array to using MaybeUninit and UnsafeCell to initialize the array and write to or read the memory location with interior mutability. This means that the atomics are now the definitive guards like was initially desired and the mutable reference needed for push and pop could be removed. Drop was added to clean up the items in the array now that they may need to be dropped themselves. Some convinence functions were added for users to get information about the RingBuffer. --- src/ring_buffer.rs | 328 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 291 insertions(+), 37 deletions(-) diff --git a/src/ring_buffer.rs b/src/ring_buffer.rs index fc9c9bf..54e3a78 100644 --- a/src/ring_buffer.rs +++ b/src/ring_buffer.rs @@ -1,106 +1,360 @@ -use core::sync::atomic::{AtomicUsize, Ordering}; use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::{AtomicUsize, Ordering}; +/// Errors that can occur when using the [`RingBuffer`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BufferError { + /// This will occur when you try to set the capacity of the [`RingBuffer`] to + /// zero. InvalidCapacity, + + /// This will occur when the [`RingBuffer`] is unable to + /// accomodate any new items, signaling back pressure. Full, - Empty } +/// This is here to make sure that the defined type is on it's own CacheLine. +/// Targets are x86_64 and a STM32H7. +/// +/// x86_64 has a cache line size of 64 bytes. +/// STM32H7 has a cache line size of 32 bytes. +/// +/// Adjust the repr alignment as needed. Normally this would use cfg based on +/// the target_arch or based on a value from Cargo.toml. +#[repr(align(64))] +struct CacheLinePadded { pub value: T } + + /// A heapless SPSC RingBuffer. /// +/// The implementation uses Atomics to track the amount of items in the memory +/// buffer. These atomics act as guards on the memory buffers access. Because +/// the atomics are going to be constantly incrementing and wrapping around as +/// the buffer is used their values are not in the buffers space and will need +/// to be constrained from counter space to buffer space with a modulo. +/// /// - T is the type to store in the buffer. /// - N is the size of the buffer. pub struct RingBuffer { - buffer: [UnsafeCell; N], - head: AtomicUsize, - tail: AtomicUsize + /// The memory to store into. + /// + /// The UnsafeCell is to allow interior mutability. + /// Access is controlled with the atomic accessors. + /// + /// The MaybeUninit is used here to allow for uninitialized memory. + /// This allows us to not require a Default implementation on T for + /// buffer initialization. + buffer: [UnsafeCell>; N], + + /// The head value. + /// + /// Like a queue this will only be added to until it wraps + /// around. This will increment from 0 to usize::MAX. It doesn't + /// represent the position in the buffer directly, it will need to be + /// reconciled into a buffer position by using modulo. + head: CacheLinePadded, + + /// The tail value. + /// + /// Like a queue this will only be added to until it wraps + /// around. This will increment from 0 to usize::MAX. It doesn't + /// represent the position in the buffer directly, it will need to be + /// reconciled into a buffer position by using modulo. + tail: CacheLinePadded, } -unsafe impl Send for RingBuffer {} - - -impl RingBuffer +impl RingBuffer { /// Create a new RingBuffer that can hold N items of type T. /// - /// # T - /// - Must implement Default. - /// /// # Capacity /// - Must be greater than 0. /// - Faster operations if capacity is a power of 2. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// assert_eq!(buffer.len(), 0); + /// assert_eq!(buffer.capacity(), 32); + /// ``` pub fn new() -> Result { if N == 0 { return Err(BufferError::InvalidCapacity) } Ok(RingBuffer { - buffer: core::array::from_fn(|_| UnsafeCell::new(T::default())), - head: AtomicUsize::new(0), - tail: AtomicUsize::new(0) + buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N], + head: CacheLinePadded:: { value: AtomicUsize::new(0) }, + tail: CacheLinePadded:: { value: AtomicUsize::new(0) } }) } - pub fn push(&self, value: T) -> Result<(), BufferError> + /// The amount of items that can be stored in the `RingBuffer`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// let cap = buffer.capacity(); + /// let len = buffer.len(); + /// + /// assert_ne!(cap, len); + /// assert_eq!(len, 0); + /// assert_eq!(cap, 32); + /// ``` + pub const fn capacity(&self) -> usize { N } + + /// The amount of items in the `RingBuffer`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(25).expect("Buffer is full."); + /// buffer.push_back(42).expect("Buffer is full."); + /// buffer.push_back(17).expect("Buffer is full."); + /// buffer.push_back(134).expect("Buffer is full."); + /// buffer.push_back(202).expect("Buffer is full."); + /// buffer.push_back(244).expect("Buffer is full."); + /// + /// assert_eq!(buffer.len(), 6); + /// assert_ne!(buffer.capacity(), buffer.len()); + /// ``` + pub fn len(&self) -> usize { - // Obtain tail and head. - let tail = self.tail.load(Ordering::Relaxed); - let head = self.head.load(Ordering::Acquire); + // Both producers and consumers will be calling this function. Grab the + // head and tail really quickly using Acquire so that the length is + // accurate. + let tail = self.tail.value.load(Ordering::Acquire); + let head = self.head.value.load(Ordering::Acquire); + + // Remember that the atomics are working in a counter space, so wrapping + // sub is required. + tail.wrapping_sub(head) + } + + /// Check if the `RingBuffer` is empty. + /// + /// # Returns + /// `true` if the `RingBuffer` is empty; otherwise it will return `false`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(25).expect("Buffer is full."); + /// let _ = buffer.pop_front(); + /// + /// assert!(buffer.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool + { + // The consumer will be calling this function. Acquire the tail position + // from the producer. + let head = self.head.value.load(Ordering::Relaxed); + let tail = self.tail.value.load(Ordering::Acquire); + + // Empty is when the tail and the head in counter space are the same. + head == tail + } + + /// Check if the `RingBuffer` is full. + /// + /// # Returns + /// `true` if the `RingBuffer` is full; otherwise it will return `false`. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(25).expect("Buffer is full."); + /// buffer.push_back(15).expect("Buffer is full."); + /// buffer.push_back(42).expect("Buffer is full."); + /// buffer.push_back(1).expect("Buffer is full."); + /// buffer.push_back(2).expect("Buffer is full."); + /// + /// assert!(buffer.is_full()); + /// ``` + pub fn is_full(&self) -> bool + { + // The producer will be calling this function. Acquire the head position + // from the consumer. + let tail = self.tail.value.load(Ordering::Relaxed); + let head = self.head.value.load(Ordering::Acquire); + + // Full is marked when the distance between the head and tail in counter + // space is equal to our memory buffers capacity. + tail.wrapping_sub(head) == N + } + + /// Add an item to the buffer. + /// + /// # Returns + /// `Ok` if the operation was a success. + /// `Err` with BufferError::Full to signal back pressure. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(1).expect("Buffer is full."); + /// buffer.push_back(2).expect("Buffer is full."); + /// buffer.push_back(3).expect("Buffer is full."); + /// buffer.push_back(4).expect("Buffer is full."); + /// + /// assert_eq!(buffer.len(), 4); + /// ``` + pub fn push_back(&self, value: T) -> Result<(), BufferError> + { + // Obtain tail and head. Here the producer is pushing into the buffer, so + // we need to acquire the heads position from the consumer. The tail is + // just aquired because the producer is the one that should be changing + // it. + let tail = self.tail.value.load(Ordering::Relaxed); + let head = self.head.value.load(Ordering::Acquire); // Check if the buffer is Full. + // Full is marked when the distance between the head and tail in counter + // space is equal to our memory buffers capacity. if tail.wrapping_sub(head) == N { return Err(BufferError::Full); } // Add the item to the buffer. Here we are using the faster bit // masking if the capacity is a power of two. Otherwise we do the // division. let index = get_index(tail, N); + unsafe { (*self.buffer[index].get()).write(value); } - unsafe - { - *self.buffer[index].get() = value; - } - - // Increment and store the new tail position. - self.tail.store(tail.wrapping_add(1), Ordering::Release); + // Increment and store the new tail position, this will publish it so the + // consumer can read it. + self.tail.value.store(tail.wrapping_add(1), Ordering::Release); Ok(()) } - pub fn pop(&self) -> Result + /// Remove an Item from the `RingBuffer`. + /// + /// # Returns + /// `Some` with the item removed from the `RingBuffer`. + /// `None` when the [`RingBuffer`] has no items to return because it + /// is empty. + /// + /// # Examples + /// ```rust + /// use example::RingBuffer; + /// + /// let buffer: RingBuffer = + /// RingBuffer::new().expect("Buffer size was set to 0"); + /// + /// buffer.push_back(1).expect("Buffer is full."); + /// buffer.push_back(2).expect("Buffer is full."); + /// buffer.push_back(3).expect("Buffer is full."); + /// buffer.push_back(4).expect("Buffer is full."); + /// + /// let item = buffer.pop_front().expect("The head item couldn't be removed."); + /// + /// assert_eq!(item, 1); + /// assert_eq!(buffer.len(), 3); + /// ``` + pub fn pop_front(&self) -> Option { - // Obtain head and tail. - let head = self.head.load(Ordering::Relaxed); - let tail = self.tail.load(Ordering::Acquire); + // Obtain head and tail. Here the consumer is removing from the buffer, + // so we need to acquire the tails position from the producer. The head + // is just aquired because the consumer is the one that should be + // changing it. + let head = self.head.value.load(Ordering::Relaxed); + let tail = self.tail.value.load(Ordering::Acquire); // Check if the buffer is empty. - if head == tail { return Err(BufferError::Empty); } + // Empty is when the tail and the head in counter space are the same. + if head == tail { return None; } // Get the item from the buffer. Here we are using the faster bit // masking if the capacity is a power of two. Otherwise we do the // division. let index = get_index(head, N); - let value = unsafe { *self.buffer[index].get() }; + let value = unsafe { (*self.buffer[index].get()).assume_init_read() }; - // Increment the head to the next item. - self.head.store(head.wrapping_add(1), Ordering::Release); + // Increment the head to the next item, this will publish it so the + // producer can read it. + self.head.value.store(head.wrapping_add(1), Ordering::Release); - Ok(value) + Some(value) } } -// Use the faster bit masking if the capacity is a power of two. -// Otherwise we do the division. This should get optimized away. +// Since we can store any type of T in the RingBuffer we need to make sure that +// items moved into it are released when the buffer is to be released. +// +// **Note** Make sure that the Drop of any stored items does not panic. +impl Drop for RingBuffer +{ + fn drop(&mut self) + { + // Get the head and tail positions. Noone else can be using the buffer + // here because we have sole ownership of it through the mutable reference + // of self. + let mut head = self.head.value.load(Ordering::Relaxed); + let tail = self.tail.value.load(Ordering::Relaxed); + + // Loop through and drop each item. + while head != tail + { + let index = get_index(head, N); + unsafe { (*self.buffer[index].get()).assume_init_drop() }; + + head = head.wrapping_add(1); + } + } +} + + +// The RingBuffer is using atomics to guard the access to the buffer. This +// allows us to state that the buffer is designed to be used from multiple +// threads from the same reference. The RingBuffer requires that its items all +// be Send so that they can traverse the threads pushing and pulling from it. +unsafe impl Sync for RingBuffer {} + + + +/// Map the counter space to the buffer space. +/// +/// Use the faster bit masking if the capacity is a power of two. +/// Otherwise we do the division. This should get optimized away. +/// I have not yet checked the output to make sure. +/// +/// TODO: Check that this is getting optimized right. +/// +/// # Arguments +/// `pos` This is the position in counter space. +/// `cap` The capacity, size, of the buffer space. #[inline(always)] fn get_index(pos: usize, cap: usize) -> usize {