An initial SPSC ring buffer implementation.
This still requires tests, but it is using atomics and UnsafeCell to create a lock free way of accessing the buffer data.
This commit is contained in:
@ -14,3 +14,4 @@
|
||||
|
||||
pub mod basic;
|
||||
pub mod adv_async;
|
||||
pub mod ring_buffer;
|
||||
|
||||
108
src/ring_buffer.rs
Normal file
108
src/ring_buffer.rs
Normal file
@ -0,0 +1,108 @@
|
||||
use core::sync::atomic::{AtomicUsize, Ordering};
|
||||
use core::cell::UnsafeCell;
|
||||
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum BufferError
|
||||
{
|
||||
InvalidCapacity,
|
||||
Full,
|
||||
Empty
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// A heapless SPSC RingBuffer.
|
||||
///
|
||||
/// - T is the type to store in the buffer.
|
||||
/// - N is the size of the buffer.
|
||||
pub struct RingBuffer<T, const N: usize>
|
||||
{
|
||||
buffer: [UnsafeCell<T>; N],
|
||||
head: AtomicUsize,
|
||||
tail: AtomicUsize
|
||||
}
|
||||
|
||||
|
||||
|
||||
unsafe impl<T: Send, const N: usize> Send for RingBuffer<T, N> {}
|
||||
|
||||
|
||||
impl<T: Copy + Default, const N: usize> RingBuffer<T, N>
|
||||
{
|
||||
/// Create a new RingBuffer that can hold N items of type T.
|
||||
///
|
||||
/// # T
|
||||
/// - Must implement Default.
|
||||
///
|
||||
/// # Capacity
|
||||
/// - Must be greater than 0.
|
||||
/// - Faster operations if capacity is a power of 2.
|
||||
pub fn new() -> Result<Self, BufferError>
|
||||
{
|
||||
if N == 0 { return Err(BufferError::InvalidCapacity) }
|
||||
|
||||
Ok(RingBuffer
|
||||
{
|
||||
buffer: core::array::from_fn(|_| UnsafeCell::new(T::default())),
|
||||
head: AtomicUsize::new(0),
|
||||
tail: AtomicUsize::new(0)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn push(&self, value: T) -> Result<(), BufferError>
|
||||
{
|
||||
// Obtain tail and head.
|
||||
let tail = self.tail.load(Ordering::Relaxed);
|
||||
let head = self.head.load(Ordering::Acquire);
|
||||
|
||||
// Check if the buffer is Full.
|
||||
if tail.wrapping_sub(head) == N { return Err(BufferError::Full); }
|
||||
|
||||
// Add the item to the buffer. Here we are using the faster bit
|
||||
// masking if the capacity is a power of two. Otherwise we do the
|
||||
// division.
|
||||
let index = get_index(tail, N);
|
||||
|
||||
unsafe
|
||||
{
|
||||
*self.buffer[index].get() = value;
|
||||
}
|
||||
|
||||
// Increment and store the new tail position.
|
||||
self.tail.store(tail.wrapping_add(1), Ordering::Release);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn pop(&self) -> Result<T, BufferError>
|
||||
{
|
||||
// Obtain head and tail.
|
||||
let head = self.head.load(Ordering::Relaxed);
|
||||
let tail = self.tail.load(Ordering::Acquire);
|
||||
|
||||
// Check if the buffer is empty.
|
||||
if head == tail { return Err(BufferError::Empty); }
|
||||
|
||||
// Get the item from the buffer. Here we are using the faster bit
|
||||
// masking if the capacity is a power of two. Otherwise we do the
|
||||
// division.
|
||||
let index = get_index(head, N);
|
||||
let value = unsafe { *self.buffer[index].get() };
|
||||
|
||||
// Increment the head to the next item.
|
||||
self.head.store(head.wrapping_add(1), Ordering::Release);
|
||||
|
||||
Ok(value)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Use the faster bit masking if the capacity is a power of two.
|
||||
// Otherwise we do the division. This should get optimized away.
|
||||
#[inline(always)]
|
||||
fn get_index(pos: usize, cap: usize) -> usize
|
||||
{
|
||||
if cap.is_power_of_two() { pos & (cap - 1) } else { pos % cap }
|
||||
}
|
||||
Reference in New Issue
Block a user