Implement concurrent trash dropping which seems to work!
Co-authored-by: Bennet <bennet@zed.dev>
This commit is contained in:
@@ -5,26 +5,60 @@ use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
ptr,
|
||||
rc::Rc,
|
||||
sync::atomic,
|
||||
sync::atomic::AtomicPtr,
|
||||
sync::{
|
||||
atomic::{self, AtomicPtr},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
struct ArenaElementHeader {
|
||||
next_element: *mut u8,
|
||||
value: *mut u8,
|
||||
drop_mutex: Mutex<unsafe fn(*mut u8)>,
|
||||
}
|
||||
|
||||
/// An arena allocator with the following properties:
|
||||
///
|
||||
/// * Support for mixed-type values.
|
||||
/// * Can only allocate from one thread.
|
||||
/// * Can invalidate + trash all current values in constant time.
|
||||
/// * Support for concurrently dropping trash from a thread other than the allocating thread.
|
||||
///
|
||||
/// The internal layout of is a circular buffer (ring buffer) that looks like this:
|
||||
///
|
||||
/// ```
|
||||
/// < tttttttttttttvvvvvvvvvvv >
|
||||
/// ^ ^ ^ ^ ^
|
||||
/// start trash_start trash_end next_write end
|
||||
/// ```
|
||||
///
|
||||
/// Illustrating the circular nature of the buffer where the values wrap around:
|
||||
///
|
||||
/// ```
|
||||
/// <vvvvvvv tttttttttttvvvvvvvvvvvv>
|
||||
/// ^ ^ ^ ^ ^
|
||||
/// start next_write trash_start trash_end end
|
||||
/// ```
|
||||
pub struct Arena {
|
||||
start: *mut u8,
|
||||
end: *mut u8,
|
||||
next_write: AtomicPtr<u8>,
|
||||
trash_start: AtomicPtr<u8>,
|
||||
trash_end: AtomicPtr<u8>,
|
||||
shared: Arc<SharedState>,
|
||||
next_write: *mut u8,
|
||||
valid: Rc<Cell<bool>>,
|
||||
}
|
||||
|
||||
pub struct MustDropTrash();
|
||||
/// Mutable state shared by `Arena` and `TrashHandle`.
|
||||
struct SharedState {
|
||||
start: AtomicPtr<u8>,
|
||||
end: AtomicPtr<u8>,
|
||||
trash_start: AtomicPtr<u8>,
|
||||
trash_end: AtomicPtr<u8>,
|
||||
drop_trash_is_running: Mutex<()>,
|
||||
}
|
||||
|
||||
/// `TrashHandle` is used to drop trashed values from the arena. It can be used from threads other
|
||||
/// than the allocating thread.
|
||||
pub struct TrashHandle(Arc<SharedState>);
|
||||
|
||||
impl TrashHandle {
|
||||
pub fn drop_trash(&self) {
|
||||
self.0.drop_trash_if_not_already_running();
|
||||
}
|
||||
}
|
||||
|
||||
impl Arena {
|
||||
pub fn new(size_in_bytes: usize) -> Self {
|
||||
@@ -32,67 +66,36 @@ impl Arena {
|
||||
let layout = alloc::Layout::from_size_align(size_in_bytes, 1).unwrap();
|
||||
let start = alloc::alloc(layout);
|
||||
let end = start.add(size_in_bytes);
|
||||
Self {
|
||||
start,
|
||||
end,
|
||||
next_write: AtomicPtr::new(start),
|
||||
let shared = Arc::new(SharedState {
|
||||
start: AtomicPtr::new(start),
|
||||
end: AtomicPtr::new(end),
|
||||
trash_start: AtomicPtr::new(start),
|
||||
trash_end: AtomicPtr::new(start),
|
||||
drop_trash_is_running: Mutex::new(()),
|
||||
});
|
||||
Self {
|
||||
shared,
|
||||
next_write: start,
|
||||
valid: Rc::new(Cell::new(true)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.next_write.load(atomic::Ordering::Relaxed) as usize - self.start as usize
|
||||
self.next_write as usize - self.shared.start() as usize
|
||||
}
|
||||
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.end as usize - self.start as usize
|
||||
self.shared.end() as usize - self.shared.start() as usize
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
let must_drop_trash = self.trash_everything();
|
||||
self.drop_trash(must_drop_trash);
|
||||
}
|
||||
|
||||
pub fn trash_everything(&mut self) -> MustDropTrash {
|
||||
pub fn trash_everything(&mut self) -> TrashHandle {
|
||||
self.valid.set(false);
|
||||
self.valid = Rc::new(Cell::new(true));
|
||||
let trash_end = self.trash_end.load(atomic::Ordering::Relaxed);
|
||||
unsafe {
|
||||
// Help finish last `drop_trash` if still ongoing. This is so that there is only one
|
||||
// concurrent `drop_trash` happening at a time.
|
||||
self.ensure_no_trash_before(trash_end);
|
||||
}
|
||||
let next_write = self.next_write.load(atomic::Ordering::Relaxed);
|
||||
let trash_start = self.trash_start.load(atomic::Ordering::Relaxed);
|
||||
assert!(trash_start == trash_end);
|
||||
self.trash_end.store(next_write, atomic::Ordering::Relaxed);
|
||||
MustDropTrash()
|
||||
}
|
||||
|
||||
pub fn drop_trash(&self, _: MustDropTrash) {
|
||||
let trash_start = self.trash_start.load(atomic::Ordering::Relaxed);
|
||||
let trash_end = self.trash_end.load(atomic::Ordering::Relaxed);
|
||||
let mut offset = trash_start;
|
||||
while ptr_is_within_circular_interval(offset, trash_start, trash_end) {
|
||||
unsafe {
|
||||
offset = drop_arena_element(offset);
|
||||
}
|
||||
self.trash_start.store(offset, atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn ensure_no_trash_before(&self, target: *mut u8) {
|
||||
let mut offset = self.trash_start.load(atomic::Ordering::Relaxed);
|
||||
let mut trash_end = self.trash_end.load(atomic::Ordering::Relaxed);
|
||||
if ptr_is_within_circular_interval(target, offset, trash_end) {
|
||||
log::warn!("Unexpectedly needed to clear element arena trash on main thread.");
|
||||
while ptr_is_within_circular_interval(target, offset, trash_end) {
|
||||
offset = drop_arena_element(offset);
|
||||
}
|
||||
}
|
||||
self.shared
|
||||
.trash_end
|
||||
.store(self.next_write, atomic::Ordering::Relaxed);
|
||||
TrashHandle(self.shared.clone())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -102,31 +105,31 @@ impl Arena {
|
||||
}
|
||||
|
||||
unsafe {
|
||||
let next_write = self.next_write.load(atomic::Ordering::Relaxed);
|
||||
let trash_start = self.trash_start.load(atomic::Ordering::Relaxed);
|
||||
let trash_end = self.trash_end.load(atomic::Ordering::Relaxed);
|
||||
|
||||
let (header_ptr, offset) =
|
||||
self.get_wrapped_allocation_ptrs::<ArenaElementHeader>(next_write);
|
||||
self.get_wrapped_allocation_ptrs::<ArenaElementHeader>(self.next_write);
|
||||
let (value_ptr, offset) = self.get_wrapped_allocation_ptrs::<T>(offset);
|
||||
assert!(offset <= self.end, "not enough space in Arena");
|
||||
assert!(offset <= self.shared.end(), "not enough space in Arena");
|
||||
let trash_start = self.shared.trash_start();
|
||||
let trash_end = self.shared.trash_end();
|
||||
assert!(
|
||||
!ptr_is_within_circular_interval(offset, trash_start, trash_end),
|
||||
// FIXME: this is broken.
|
||||
trash_start == trash_end
|
||||
|| ptr_is_within_circular_interval(offset, self.next_write, trash_start),
|
||||
"not enough space in Arena"
|
||||
);
|
||||
self.ensure_no_trash_before(offset);
|
||||
self.shared.ensure_no_trash_before(offset);
|
||||
|
||||
ptr::write(
|
||||
header_ptr,
|
||||
ArenaElementHeader {
|
||||
next_element: offset,
|
||||
value: value_ptr.cast(),
|
||||
drop_mutex: Mutex::new(drop::<T>),
|
||||
value: AtomicPtr::new(value_ptr.cast()),
|
||||
drop: drop::<T>,
|
||||
},
|
||||
);
|
||||
ptr::write(value_ptr, f());
|
||||
|
||||
self.next_write.store(offset, atomic::Ordering::Relaxed);
|
||||
self.next_write = offset;
|
||||
|
||||
ArenaBox {
|
||||
ptr: value_ptr,
|
||||
@@ -138,16 +141,96 @@ impl Arena {
|
||||
#[inline(always)]
|
||||
unsafe fn get_wrapped_allocation_ptrs<T>(&self, offset: *mut u8) -> (*mut T, *mut u8) {
|
||||
let (ptr, next_offset) = get_allocation_ptrs::<T>(offset);
|
||||
if next_offset < self.end {
|
||||
if next_offset < self.shared.end() {
|
||||
(ptr, next_offset)
|
||||
} else if next_offset == self.end {
|
||||
(ptr, self.start)
|
||||
} else if next_offset == self.shared.end() {
|
||||
(ptr, self.shared.start())
|
||||
} else {
|
||||
get_allocation_ptrs::<T>(self.start)
|
||||
get_allocation_ptrs::<T>(self.shared.start())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SharedState {
|
||||
fn drop(&mut self) {
|
||||
self.drop_trash_ensure_finishes();
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
fn drop_trash_if_not_already_running(&self) {
|
||||
self.drop_trash_impl(false);
|
||||
}
|
||||
|
||||
fn drop_trash_ensure_finishes(&self) {
|
||||
self.drop_trash_impl(true);
|
||||
}
|
||||
|
||||
fn drop_trash_impl(&self, ensure_finishes: bool) {
|
||||
if let Some(_guard) = self.drop_trash_is_running.try_lock() {
|
||||
let trash_start = self.trash_start();
|
||||
let mut offset = self.trash_end();
|
||||
// trash_end is re-queried as this might run concurrently with multiple trash_everything calls.
|
||||
//
|
||||
// FIXME: what if there is nothing to trash / nothing was allocated?
|
||||
while ptr_is_within_circular_interval(offset, trash_start, self.trash_end()) {
|
||||
unsafe {
|
||||
offset = drop_arena_element(offset);
|
||||
}
|
||||
self.trash_start.store(offset, atomic::Ordering::Relaxed);
|
||||
}
|
||||
} else if ensure_finishes {
|
||||
unsafe {
|
||||
// If we can't get the lock, help out with trashing.
|
||||
self.ensure_no_trash_before(self.trash_end());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn ensure_no_trash_before(&self, target: *mut u8) {
|
||||
let mut offset = self.trash_start();
|
||||
let mut trash_end = self.trash_end();
|
||||
if ptr_is_within_circular_interval(target, offset, trash_end) {
|
||||
log::warn!("Unexpectedly needed to clear element arena trash on main thread.");
|
||||
while ptr_is_within_circular_interval(target, offset, trash_end) {
|
||||
// Note that it is possible for the background thread to increment trash_start
|
||||
// beyond this offset while the drop is still doing work. This is fine because this
|
||||
// code is run on the same thread as allocations, so allocations will only see a valid
|
||||
// trash_start.
|
||||
offset = drop_arena_element(offset);
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: won't make progress if background thread is starved. If ArenaElementHeader stores
|
||||
// a state indicating that the drop has happened this can possibly be fixed.
|
||||
if ptr_is_within_circular_interval(target, self.trash_start(), trash_end) {
|
||||
log::error!(
|
||||
"Needed to spin waiting for background thread to clear element arena trash."
|
||||
);
|
||||
while ptr_is_within_circular_interval(target, self.trash_start(), trash_end) {
|
||||
// TODO: revisit choice of sleep duration
|
||||
thread::sleep(Duration::from_micros(50));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start(&self) -> *mut u8 {
|
||||
self.start.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn end(&self) -> *mut u8 {
|
||||
self.end.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn trash_start(&self) -> *mut u8 {
|
||||
self.trash_start.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn trash_end(&self) -> *mut u8 {
|
||||
self.trash_end.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
unsafe fn get_allocation_ptrs<T>(offset: *mut u8) -> (*mut T, *mut u8) {
|
||||
let data_layout = alloc::Layout::new::<T>();
|
||||
@@ -156,7 +239,10 @@ unsafe fn get_allocation_ptrs<T>(offset: *mut u8) -> (*mut T, *mut u8) {
|
||||
(data_ptr, offset.add(data_layout.size()))
|
||||
}
|
||||
|
||||
// TODO: many uses of this only actively use one of the bounds.
|
||||
// FIXME: many uses of this only actively use one of the bounds.
|
||||
//
|
||||
// FIXME: start == end could either mean everything is included or nothing. Here it is nothing
|
||||
// included, but are there corner cases where the actual state is everything?
|
||||
#[inline(always)]
|
||||
fn ptr_is_within_circular_interval<T>(ptr: *const T, start: *const T, end: *const T) -> bool {
|
||||
if end < start {
|
||||
@@ -166,24 +252,23 @@ fn ptr_is_within_circular_interval<T>(ptr: *const T, start: *const T, end: *cons
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn drop_arena_element(offset: *mut u8) -> *mut u8 {
|
||||
let (header, _) = get_allocation_ptrs::<ArenaElementHeader>(offset);
|
||||
if (*header).value.is_null() {
|
||||
return (*header).next_element;
|
||||
}
|
||||
let drop_fn = (*header).drop_mutex.lock();
|
||||
if (*header).value.is_null() {
|
||||
return (*header).next_element;
|
||||
}
|
||||
(drop_fn)((*header).value);
|
||||
(*header).value = ptr::null_mut();
|
||||
(*header).next_element
|
||||
struct ArenaElementHeader {
|
||||
next_element: *mut u8,
|
||||
value: AtomicPtr<u8>,
|
||||
drop: unsafe fn(*mut u8),
|
||||
}
|
||||
|
||||
impl Drop for Arena {
|
||||
fn drop(&mut self) {
|
||||
self.clear();
|
||||
unsafe fn drop_arena_element(offset: *mut u8) -> *mut u8 {
|
||||
let (header, _) = get_allocation_ptrs::<ArenaElementHeader>(offset);
|
||||
let value = (*header)
|
||||
.value
|
||||
.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
|
||||
if value.is_null() {
|
||||
return (*header).next_element;
|
||||
}
|
||||
let drop_fn = (*header).drop;
|
||||
(drop_fn)(value);
|
||||
(*header).next_element
|
||||
}
|
||||
|
||||
pub struct ArenaBox<T: ?Sized> {
|
||||
@@ -270,7 +355,7 @@ mod tests {
|
||||
assert_eq!(*c, 3);
|
||||
assert_eq!(*d, 4);
|
||||
|
||||
arena.clear();
|
||||
arena.trash_everything().drop_trash();
|
||||
let a = arena.alloc(|| 5u64);
|
||||
let b = arena.alloc(|| 6u32);
|
||||
let c = arena.alloc(|| 7u16);
|
||||
@@ -289,7 +374,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
arena.alloc(|| DropGuard(dropped.clone()));
|
||||
arena.clear();
|
||||
arena.trash_everything().drop_trash();
|
||||
assert!(dropped.get());
|
||||
}
|
||||
|
||||
@@ -328,7 +413,7 @@ mod tests {
|
||||
let mut arena = Arena::new(256);
|
||||
let value = arena.alloc(|| 1u64);
|
||||
|
||||
arena.clear();
|
||||
arena.trash_everything().drop_trash();
|
||||
let _read_value = *value;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,6 +117,7 @@ slotmap::new_key_type! {
|
||||
|
||||
thread_local! {
|
||||
/// 8MB wasn't quite enough...
|
||||
/// TODO: make this blow up when called from other threads
|
||||
pub(crate) static ELEMENT_ARENA: RefCell<Arena> = RefCell::new(Arena::new(32 * 1024 * 1024));
|
||||
}
|
||||
|
||||
@@ -1441,24 +1442,28 @@ impl<'a> WindowContext<'a> {
|
||||
self.window
|
||||
.next_frame
|
||||
.finish(&mut self.window.rendered_frame);
|
||||
let must_drop_trash = ELEMENT_ARENA.with_borrow_mut(|element_arena| {
|
||||
let trash_handle = ELEMENT_ARENA.with_borrow_mut(|element_arena| {
|
||||
let percentage = (element_arena.len() as f32 / element_arena.capacity() as f32) * 100.;
|
||||
if percentage >= 80. {
|
||||
log::warn!("elevated element arena occupation: {}.", percentage);
|
||||
}
|
||||
let start = SystemTime::now();
|
||||
let must_drop_trash = element_arena.trash_everything();
|
||||
let trash_handle = element_arena.trash_everything();
|
||||
log::error!(
|
||||
"Clear time = {:?}",
|
||||
"trash_everything elapsed time = {:?}",
|
||||
SystemTime::now().duration_since(start).unwrap()
|
||||
);
|
||||
must_drop_trash
|
||||
trash_handle
|
||||
});
|
||||
self.app
|
||||
.background_executor()
|
||||
.spawn(async move {
|
||||
// FIXME: doesn't work as it's thread local
|
||||
ELEMENT_ARENA.with_borrow(|element_arena| element_arena.drop_trash(must_drop_trash))
|
||||
let start = SystemTime::now();
|
||||
trash_handle.drop_trash();
|
||||
log::error!(
|
||||
"drop_trash elapsed time = {:?}",
|
||||
SystemTime::now().duration_since(start).unwrap()
|
||||
);
|
||||
})
|
||||
.detach();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user