From a76d3950f4a06e496e84e4bdb2ce8fc30a2ca447 Mon Sep 17 00:00:00 2001 From: "mgsloan@gmail.com" Date: Wed, 20 Nov 2024 13:49:09 -0700 Subject: [PATCH] Implement concurrent trash dropping which seems to work! Co-authored-by: Bennet --- crates/gpui/src/arena.rs | 273 +++++++++++++++++++++++++------------- crates/gpui/src/window.rs | 17 ++- 2 files changed, 190 insertions(+), 100 deletions(-) diff --git a/crates/gpui/src/arena.rs b/crates/gpui/src/arena.rs index f108f4f75a..6dd87c7c96 100644 --- a/crates/gpui/src/arena.rs +++ b/crates/gpui/src/arena.rs @@ -5,26 +5,60 @@ use std::{ ops::{Deref, DerefMut}, ptr, rc::Rc, - sync::atomic, - sync::atomic::AtomicPtr, + sync::{ + atomic::{self, AtomicPtr}, + Arc, + }, + thread, + time::Duration, }; -struct ArenaElementHeader { - next_element: *mut u8, - value: *mut u8, - drop_mutex: Mutex, -} - +/// An arena allocator with the following properties: +/// +/// * Support for mixed-type values. +/// * Can only allocate from one thread. +/// * Can invalidate + trash all current values in constant time. +/// * Support for concurrently dropping trash from a thread other than the allocating thread. +/// +/// The internal layout of is a circular buffer (ring buffer) that looks like this: +/// +/// ``` +/// < tttttttttttttvvvvvvvvvvv > +/// ^ ^ ^ ^ ^ +/// start trash_start trash_end next_write end +/// ``` +/// +/// Illustrating the circular nature of the buffer where the values wrap around: +/// +/// ``` +/// +/// ^ ^ ^ ^ ^ +/// start next_write trash_start trash_end end +/// ``` pub struct Arena { - start: *mut u8, - end: *mut u8, - next_write: AtomicPtr, - trash_start: AtomicPtr, - trash_end: AtomicPtr, + shared: Arc, + next_write: *mut u8, valid: Rc>, } -pub struct MustDropTrash(); +/// Mutable state shared by `Arena` and `TrashHandle`. +struct SharedState { + start: AtomicPtr, + end: AtomicPtr, + trash_start: AtomicPtr, + trash_end: AtomicPtr, + drop_trash_is_running: Mutex<()>, +} + +/// `TrashHandle` is used to drop trashed values from the arena. It can be used from threads other +/// than the allocating thread. +pub struct TrashHandle(Arc); + +impl TrashHandle { + pub fn drop_trash(&self) { + self.0.drop_trash_if_not_already_running(); + } +} impl Arena { pub fn new(size_in_bytes: usize) -> Self { @@ -32,67 +66,36 @@ impl Arena { let layout = alloc::Layout::from_size_align(size_in_bytes, 1).unwrap(); let start = alloc::alloc(layout); let end = start.add(size_in_bytes); - Self { - start, - end, - next_write: AtomicPtr::new(start), + let shared = Arc::new(SharedState { + start: AtomicPtr::new(start), + end: AtomicPtr::new(end), trash_start: AtomicPtr::new(start), trash_end: AtomicPtr::new(start), + drop_trash_is_running: Mutex::new(()), + }); + Self { + shared, + next_write: start, valid: Rc::new(Cell::new(true)), } } } pub fn len(&self) -> usize { - self.next_write.load(atomic::Ordering::Relaxed) as usize - self.start as usize + self.next_write as usize - self.shared.start() as usize } pub fn capacity(&self) -> usize { - self.end as usize - self.start as usize + self.shared.end() as usize - self.shared.start() as usize } - pub fn clear(&mut self) { - let must_drop_trash = self.trash_everything(); - self.drop_trash(must_drop_trash); - } - - pub fn trash_everything(&mut self) -> MustDropTrash { + pub fn trash_everything(&mut self) -> TrashHandle { self.valid.set(false); self.valid = Rc::new(Cell::new(true)); - let trash_end = self.trash_end.load(atomic::Ordering::Relaxed); - unsafe { - // Help finish last `drop_trash` if still ongoing. This is so that there is only one - // concurrent `drop_trash` happening at a time. - self.ensure_no_trash_before(trash_end); - } - let next_write = self.next_write.load(atomic::Ordering::Relaxed); - let trash_start = self.trash_start.load(atomic::Ordering::Relaxed); - assert!(trash_start == trash_end); - self.trash_end.store(next_write, atomic::Ordering::Relaxed); - MustDropTrash() - } - - pub fn drop_trash(&self, _: MustDropTrash) { - let trash_start = self.trash_start.load(atomic::Ordering::Relaxed); - let trash_end = self.trash_end.load(atomic::Ordering::Relaxed); - let mut offset = trash_start; - while ptr_is_within_circular_interval(offset, trash_start, trash_end) { - unsafe { - offset = drop_arena_element(offset); - } - self.trash_start.store(offset, atomic::Ordering::Relaxed); - } - } - - unsafe fn ensure_no_trash_before(&self, target: *mut u8) { - let mut offset = self.trash_start.load(atomic::Ordering::Relaxed); - let mut trash_end = self.trash_end.load(atomic::Ordering::Relaxed); - if ptr_is_within_circular_interval(target, offset, trash_end) { - log::warn!("Unexpectedly needed to clear element arena trash on main thread."); - while ptr_is_within_circular_interval(target, offset, trash_end) { - offset = drop_arena_element(offset); - } - } + self.shared + .trash_end + .store(self.next_write, atomic::Ordering::Relaxed); + TrashHandle(self.shared.clone()) } #[inline(always)] @@ -102,31 +105,31 @@ impl Arena { } unsafe { - let next_write = self.next_write.load(atomic::Ordering::Relaxed); - let trash_start = self.trash_start.load(atomic::Ordering::Relaxed); - let trash_end = self.trash_end.load(atomic::Ordering::Relaxed); - let (header_ptr, offset) = - self.get_wrapped_allocation_ptrs::(next_write); + self.get_wrapped_allocation_ptrs::(self.next_write); let (value_ptr, offset) = self.get_wrapped_allocation_ptrs::(offset); - assert!(offset <= self.end, "not enough space in Arena"); + assert!(offset <= self.shared.end(), "not enough space in Arena"); + let trash_start = self.shared.trash_start(); + let trash_end = self.shared.trash_end(); assert!( - !ptr_is_within_circular_interval(offset, trash_start, trash_end), + // FIXME: this is broken. + trash_start == trash_end + || ptr_is_within_circular_interval(offset, self.next_write, trash_start), "not enough space in Arena" ); - self.ensure_no_trash_before(offset); + self.shared.ensure_no_trash_before(offset); ptr::write( header_ptr, ArenaElementHeader { next_element: offset, - value: value_ptr.cast(), - drop_mutex: Mutex::new(drop::), + value: AtomicPtr::new(value_ptr.cast()), + drop: drop::, }, ); ptr::write(value_ptr, f()); - self.next_write.store(offset, atomic::Ordering::Relaxed); + self.next_write = offset; ArenaBox { ptr: value_ptr, @@ -138,16 +141,96 @@ impl Arena { #[inline(always)] unsafe fn get_wrapped_allocation_ptrs(&self, offset: *mut u8) -> (*mut T, *mut u8) { let (ptr, next_offset) = get_allocation_ptrs::(offset); - if next_offset < self.end { + if next_offset < self.shared.end() { (ptr, next_offset) - } else if next_offset == self.end { - (ptr, self.start) + } else if next_offset == self.shared.end() { + (ptr, self.shared.start()) } else { - get_allocation_ptrs::(self.start) + get_allocation_ptrs::(self.shared.start()) } } } +impl Drop for SharedState { + fn drop(&mut self) { + self.drop_trash_ensure_finishes(); + } +} + +impl SharedState { + fn drop_trash_if_not_already_running(&self) { + self.drop_trash_impl(false); + } + + fn drop_trash_ensure_finishes(&self) { + self.drop_trash_impl(true); + } + + fn drop_trash_impl(&self, ensure_finishes: bool) { + if let Some(_guard) = self.drop_trash_is_running.try_lock() { + let trash_start = self.trash_start(); + let mut offset = self.trash_end(); + // trash_end is re-queried as this might run concurrently with multiple trash_everything calls. + // + // FIXME: what if there is nothing to trash / nothing was allocated? + while ptr_is_within_circular_interval(offset, trash_start, self.trash_end()) { + unsafe { + offset = drop_arena_element(offset); + } + self.trash_start.store(offset, atomic::Ordering::Relaxed); + } + } else if ensure_finishes { + unsafe { + // If we can't get the lock, help out with trashing. + self.ensure_no_trash_before(self.trash_end()); + } + } + } + + unsafe fn ensure_no_trash_before(&self, target: *mut u8) { + let mut offset = self.trash_start(); + let mut trash_end = self.trash_end(); + if ptr_is_within_circular_interval(target, offset, trash_end) { + log::warn!("Unexpectedly needed to clear element arena trash on main thread."); + while ptr_is_within_circular_interval(target, offset, trash_end) { + // Note that it is possible for the background thread to increment trash_start + // beyond this offset while the drop is still doing work. This is fine because this + // code is run on the same thread as allocations, so allocations will only see a valid + // trash_start. + offset = drop_arena_element(offset); + } + } + + // FIXME: won't make progress if background thread is starved. If ArenaElementHeader stores + // a state indicating that the drop has happened this can possibly be fixed. + if ptr_is_within_circular_interval(target, self.trash_start(), trash_end) { + log::error!( + "Needed to spin waiting for background thread to clear element arena trash." + ); + while ptr_is_within_circular_interval(target, self.trash_start(), trash_end) { + // TODO: revisit choice of sleep duration + thread::sleep(Duration::from_micros(50)); + } + } + } + + fn start(&self) -> *mut u8 { + self.start.load(atomic::Ordering::Relaxed) + } + + fn end(&self) -> *mut u8 { + self.end.load(atomic::Ordering::Relaxed) + } + + fn trash_start(&self) -> *mut u8 { + self.trash_start.load(atomic::Ordering::Relaxed) + } + + fn trash_end(&self) -> *mut u8 { + self.trash_end.load(atomic::Ordering::Relaxed) + } +} + #[inline(always)] unsafe fn get_allocation_ptrs(offset: *mut u8) -> (*mut T, *mut u8) { let data_layout = alloc::Layout::new::(); @@ -156,7 +239,10 @@ unsafe fn get_allocation_ptrs(offset: *mut u8) -> (*mut T, *mut u8) { (data_ptr, offset.add(data_layout.size())) } -// TODO: many uses of this only actively use one of the bounds. +// FIXME: many uses of this only actively use one of the bounds. +// +// FIXME: start == end could either mean everything is included or nothing. Here it is nothing +// included, but are there corner cases where the actual state is everything? #[inline(always)] fn ptr_is_within_circular_interval(ptr: *const T, start: *const T, end: *const T) -> bool { if end < start { @@ -166,24 +252,23 @@ fn ptr_is_within_circular_interval(ptr: *const T, start: *const T, end: *cons } } -unsafe fn drop_arena_element(offset: *mut u8) -> *mut u8 { - let (header, _) = get_allocation_ptrs::(offset); - if (*header).value.is_null() { - return (*header).next_element; - } - let drop_fn = (*header).drop_mutex.lock(); - if (*header).value.is_null() { - return (*header).next_element; - } - (drop_fn)((*header).value); - (*header).value = ptr::null_mut(); - (*header).next_element +struct ArenaElementHeader { + next_element: *mut u8, + value: AtomicPtr, + drop: unsafe fn(*mut u8), } -impl Drop for Arena { - fn drop(&mut self) { - self.clear(); +unsafe fn drop_arena_element(offset: *mut u8) -> *mut u8 { + let (header, _) = get_allocation_ptrs::(offset); + let value = (*header) + .value + .swap(ptr::null_mut(), atomic::Ordering::AcqRel); + if value.is_null() { + return (*header).next_element; } + let drop_fn = (*header).drop; + (drop_fn)(value); + (*header).next_element } pub struct ArenaBox { @@ -270,7 +355,7 @@ mod tests { assert_eq!(*c, 3); assert_eq!(*d, 4); - arena.clear(); + arena.trash_everything().drop_trash(); let a = arena.alloc(|| 5u64); let b = arena.alloc(|| 6u32); let c = arena.alloc(|| 7u16); @@ -289,7 +374,7 @@ mod tests { } } arena.alloc(|| DropGuard(dropped.clone())); - arena.clear(); + arena.trash_everything().drop_trash(); assert!(dropped.get()); } @@ -328,7 +413,7 @@ mod tests { let mut arena = Arena::new(256); let value = arena.alloc(|| 1u64); - arena.clear(); + arena.trash_everything().drop_trash(); let _read_value = *value; } } diff --git a/crates/gpui/src/window.rs b/crates/gpui/src/window.rs index 37a71a8c84..8622bde448 100644 --- a/crates/gpui/src/window.rs +++ b/crates/gpui/src/window.rs @@ -117,6 +117,7 @@ slotmap::new_key_type! { thread_local! { /// 8MB wasn't quite enough... + /// TODO: make this blow up when called from other threads pub(crate) static ELEMENT_ARENA: RefCell = RefCell::new(Arena::new(32 * 1024 * 1024)); } @@ -1441,24 +1442,28 @@ impl<'a> WindowContext<'a> { self.window .next_frame .finish(&mut self.window.rendered_frame); - let must_drop_trash = ELEMENT_ARENA.with_borrow_mut(|element_arena| { + let trash_handle = ELEMENT_ARENA.with_borrow_mut(|element_arena| { let percentage = (element_arena.len() as f32 / element_arena.capacity() as f32) * 100.; if percentage >= 80. { log::warn!("elevated element arena occupation: {}.", percentage); } let start = SystemTime::now(); - let must_drop_trash = element_arena.trash_everything(); + let trash_handle = element_arena.trash_everything(); log::error!( - "Clear time = {:?}", + "trash_everything elapsed time = {:?}", SystemTime::now().duration_since(start).unwrap() ); - must_drop_trash + trash_handle }); self.app .background_executor() .spawn(async move { - // FIXME: doesn't work as it's thread local - ELEMENT_ARENA.with_borrow(|element_arena| element_arena.drop_trash(must_drop_trash)) + let start = SystemTime::now(); + trash_handle.drop_trash(); + log::error!( + "drop_trash elapsed time = {:?}", + SystemTime::now().duration_since(start).unwrap() + ); }) .detach();