From ebe835dd91dae193de4cd9fd7fa8bb648b17cb78 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 5 Sep 2025 10:37:36 +0200 Subject: [PATCH] Checkpoint --- crates/gpui/src/executor2.rs | 102 +----- crates/gpui/src/platform.rs | 26 +- crates/gpui/src/platform/linux/dispatcher.rs | 4 + crates/gpui/src/platform/mac.rs | 4 +- crates/gpui/src/platform/mac/platform.rs | 2 +- .../mac/{dispatcher.rs => scheduler.rs} | 0 crates/gpui/src/platform/test.rs | 4 +- crates/gpui/src/platform/test/dispatcher.rs | 308 ------------------ crates/scheduler/src/executor.rs | 71 +++- 9 files changed, 96 insertions(+), 425 deletions(-) rename crates/gpui/src/platform/mac/{dispatcher.rs => scheduler.rs} (100%) delete mode 100644 crates/gpui/src/platform/test/dispatcher.rs diff --git a/crates/gpui/src/executor2.rs b/crates/gpui/src/executor2.rs index bac9b38fc9..26f28fa0d9 100644 --- a/crates/gpui/src/executor2.rs +++ b/crates/gpui/src/executor2.rs @@ -1,12 +1,8 @@ use crate::App; -use async_task::Runnable; use futures::channel::mpsc; use scheduler::Timer; use smol::prelude::*; -use std::mem::ManuallyDrop; -use std::panic::Location; use std::sync::Arc; -use std::thread::{self, ThreadId}; use std::{ fmt::Debug, marker::PhantomData, @@ -123,8 +119,14 @@ impl BackgroundExecutor { where R: Send + 'static, { - // todo!("reduce monomorphization") - Task(self.0.spawn(future)) + fn inner( + executor: &scheduler::BackgroundExecutor, + future: AnyFuture, + ) -> Task { + Task(executor.spawn(future)) + } + + inner(&self.0, Box::pin(future)) } /// Enqueues the given future to be run to completion on a background thread. @@ -308,88 +310,16 @@ impl ForegroundExecutor { where R: 'static, { - // let dispatcher = self.dispatcher.clone(); - - // #[track_caller] - // fn inner( - // dispatcher: Arc, - // future: AnyLocalFuture, - // ) -> Task { - // let (runnable, task) = spawn_local_with_source_location(future, move |runnable| { - // dispatcher.dispatch_on_main_thread(runnable) - // }); - // runnable.schedule(); - // Task(TaskState::Spawned(task)) - // } - // inner::(dispatcher, Box::pin(future)) - // todo!("reduce monomorphization and spawn_local_with_source_location") - Task(self.0.spawn(future)) - } -} - -/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics. -/// -/// Copy-modified from: -/// -#[track_caller] -fn spawn_local_with_source_location( - future: Fut, - schedule: S, -) -> (Runnable<()>, async_task::Task) -where - Fut: Future + 'static, - Fut::Output: 'static, - S: async_task::Schedule<()> + Send + Sync + 'static, -{ - #[inline] - fn thread_id() -> ThreadId { - std::thread_local! { - static ID: ThreadId = thread::current().id(); + #[track_caller] + fn inner( + executor: &scheduler::ForegroundExecutor, + future: AnyLocalFuture, + ) -> Task { + Task(executor.spawn(future)) } - ID.try_with(|id| *id) - .unwrap_or_else(|_| thread::current().id()) + + inner::(&self.0, Box::pin(future)) } - - struct Checked { - id: ThreadId, - inner: ManuallyDrop, - location: &'static Location<'static>, - } - - impl Drop for Checked { - fn drop(&mut self) { - assert!( - self.id == thread_id(), - "local task dropped by a thread that didn't spawn it. Task spawned at {}", - self.location - ); - unsafe { - ManuallyDrop::drop(&mut self.inner); - } - } - } - - impl Future for Checked { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!( - self.id == thread_id(), - "local task polled by a thread that didn't spawn it. Task spawned at {}", - self.location - ); - unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } - } - } - - // Wrap the future into one that checks which thread it's on. - let future = Checked { - id: thread_id(), - inner: ManuallyDrop::new(future), - location: Location::caller(), - }; - - unsafe { async_task::spawn_unchecked(future, schedule) } } /// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`]. diff --git a/crates/gpui/src/platform.rs b/crates/gpui/src/platform.rs index 7e58a2c238..73fd393da7 100644 --- a/crates/gpui/src/platform.rs +++ b/crates/gpui/src/platform.rs @@ -41,14 +41,12 @@ use crate::{ ForegroundExecutor, GlyphId, GpuSpecs, ImageSource, Keymap, LineLayout, Pixels, PlatformInput, Point, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, ScaledPixels, Scene, ShapedGlyph, ShapedRun, SharedString, Size, SvgRenderer, SvgSize, SystemWindowTab, Task, - TaskLabel, Window, WindowControlArea, hash, point, px, size, + Window, WindowControlArea, hash, point, px, size, }; use anyhow::Result; -use async_task::Runnable; use futures::channel::oneshot; use image::codecs::gif::GifDecoder; use image::{AnimationDecoder as _, Frame}; -use parking::Unparker; use raw_window_handle::{HasDisplayHandle, HasWindowHandle}; use schemars::JsonSchema; use seahash::SeaHasher; @@ -58,7 +56,6 @@ use std::borrow::Cow; use std::hash::{Hash, Hasher}; use std::io::Cursor; use std::ops; -use std::time::{Duration, Instant}; use std::{ fmt::{self, Debug}, ops::Range, @@ -84,7 +81,7 @@ pub(crate) use test::*; pub(crate) use windows::*; #[cfg(any(test, feature = "test-support"))] -pub use test::{TestDispatcher, TestScheduler, TestSchedulerConfig, TestScreenCaptureSource}; +pub use test::{TestScheduler, TestSchedulerConfig, TestScreenCaptureSource}; /// Returns a background executor for the current platform. pub fn background_executor() -> BackgroundExecutor { @@ -556,25 +553,6 @@ pub(crate) trait PlatformWindow: HasWindowHandle + HasDisplayHandle { } } -/// This type is public so that our test macro can generate and use it, but it should not -/// be considered part of our public API. -#[doc(hidden)] -pub trait PlatformDispatcher: Send + Sync { - fn dispatch(&self, runnable: Runnable, label: Option); - fn dispatch_on_main_thread(&self, runnable: Runnable); - fn dispatch_after(&self, duration: Duration, runnable: Runnable); - fn park(&self, timeout: Option) -> bool; - fn unparker(&self) -> Unparker; - fn now(&self) -> Instant { - Instant::now() - } - - #[cfg(any(test, feature = "test-support"))] - fn as_test(&self) -> Option<&TestDispatcher> { - None - } -} - pub(crate) trait PlatformTextSystem: Send + Sync { fn add_fonts(&self, fonts: Vec>) -> Result<()>; fn all_font_names(&self) -> Vec; diff --git a/crates/gpui/src/platform/linux/dispatcher.rs b/crates/gpui/src/platform/linux/dispatcher.rs index 2d195ba043..3d32dbd2fd 100644 --- a/crates/gpui/src/platform/linux/dispatcher.rs +++ b/crates/gpui/src/platform/linux/dispatcher.rs @@ -97,6 +97,10 @@ impl LinuxDispatcher { } impl PlatformDispatcher for LinuxDispatcher { + fn is_main_thread(&self) -> bool { + thread::current().id() == self.main_thread_id + } + fn dispatch(&self, runnable: Runnable, _: Option) { self.background_sender.send(runnable).unwrap(); } diff --git a/crates/gpui/src/platform/mac.rs b/crates/gpui/src/platform/mac.rs index 76d636b457..8ca9ee6a0d 100644 --- a/crates/gpui/src/platform/mac.rs +++ b/crates/gpui/src/platform/mac.rs @@ -1,10 +1,10 @@ //! Macos screen have a y axis that goings up from the bottom of the screen and //! an origin at the bottom left of the main display. -mod dispatcher; mod display; mod display_link; mod events; mod keyboard; +mod scheduler; #[cfg(feature = "screen-capture")] mod screen_capture; @@ -45,11 +45,11 @@ use std::{ ops::Range, }; -pub(crate) use dispatcher::*; pub(crate) use display::*; pub(crate) use display_link::*; pub(crate) use keyboard::*; pub(crate) use platform::*; +pub(crate) use scheduler::*; pub(crate) use window::*; #[cfg(feature = "font-kit")] diff --git a/crates/gpui/src/platform/mac/platform.rs b/crates/gpui/src/platform/mac/platform.rs index 246165ba5a..bf418475c4 100644 --- a/crates/gpui/src/platform/mac/platform.rs +++ b/crates/gpui/src/platform/mac/platform.rs @@ -504,7 +504,7 @@ impl Platform for MacPlatform { // this, we make quitting the application asynchronous so that we aren't holding borrows to // the app state on the stack when we actually terminate the app. - use super::dispatcher::{dispatch_get_main_queue, dispatch_sys::dispatch_async_f}; + use super::scheduler::{dispatch_get_main_queue, dispatch_sys::dispatch_async_f}; unsafe { dispatch_async_f(dispatch_get_main_queue(), ptr::null_mut(), Some(quit)); diff --git a/crates/gpui/src/platform/mac/dispatcher.rs b/crates/gpui/src/platform/mac/scheduler.rs similarity index 100% rename from crates/gpui/src/platform/mac/dispatcher.rs rename to crates/gpui/src/platform/mac/scheduler.rs diff --git a/crates/gpui/src/platform/test.rs b/crates/gpui/src/platform/test.rs index 1d8d4fb655..ef03c91c61 100644 --- a/crates/gpui/src/platform/test.rs +++ b/crates/gpui/src/platform/test.rs @@ -1,12 +1,10 @@ -mod dispatcher; mod display; mod platform; mod window; -pub use dispatcher::*; pub(crate) use display::*; pub(crate) use platform::*; -pub use scheduler::{Scheduler, TestScheduler, TestSchedulerConfig}; +pub use scheduler::{TestScheduler, TestSchedulerConfig}; pub(crate) use window::*; pub use platform::TestScreenCaptureSource; diff --git a/crates/gpui/src/platform/test/dispatcher.rs b/crates/gpui/src/platform/test/dispatcher.rs deleted file mode 100644 index 6952d24f44..0000000000 --- a/crates/gpui/src/platform/test/dispatcher.rs +++ /dev/null @@ -1,308 +0,0 @@ -use crate::{PlatformDispatcher, TaskLabel}; -use async_task::Runnable; -use backtrace::Backtrace; -use collections::{HashMap, HashSet, VecDeque}; -use parking::{Parker, Unparker}; -use parking_lot::Mutex; -use rand::prelude::*; -use std::{ - future::Future, - ops::RangeInclusive, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - time::{Duration, Instant}, -}; -use util::post_inc; - -#[derive(Copy, Clone, PartialEq, Eq, Hash)] -struct TestDispatcherId(usize); - -#[doc(hidden)] -pub struct TestDispatcher { - id: TestDispatcherId, - state: Arc>, - parker: Arc>, - unparker: Unparker, -} - -struct TestDispatcherState { - random: StdRng, - foreground: HashMap>, - background: Vec, - deprioritized_background: Vec, - delayed: Vec<(Duration, Runnable)>, - start_time: Instant, - time: Duration, - is_main_thread: bool, - next_id: TestDispatcherId, - allow_parking: bool, - waiting_hint: Option, - waiting_backtrace: Option, - deprioritized_task_labels: HashSet, - block_on_ticks: RangeInclusive, -} - -impl TestDispatcher { - pub fn new(random: StdRng) -> Self { - let (parker, unparker) = parking::pair(); - let state = TestDispatcherState { - random, - foreground: HashMap::default(), - background: Vec::new(), - deprioritized_background: Vec::new(), - delayed: Vec::new(), - time: Duration::ZERO, - start_time: Instant::now(), - is_main_thread: true, - next_id: TestDispatcherId(1), - allow_parking: false, - waiting_hint: None, - waiting_backtrace: None, - deprioritized_task_labels: Default::default(), - block_on_ticks: 0..=1000, - }; - - TestDispatcher { - id: TestDispatcherId(0), - state: Arc::new(Mutex::new(state)), - parker: Arc::new(Mutex::new(parker)), - unparker, - } - } - - pub fn advance_clock(&self, by: Duration) { - let new_now = self.state.lock().time + by; - loop { - self.run_until_parked(); - let state = self.state.lock(); - let next_due_time = state.delayed.first().map(|(time, _)| *time); - drop(state); - if let Some(due_time) = next_due_time - && due_time <= new_now - { - self.state.lock().time = due_time; - continue; - } - break; - } - self.state.lock().time = new_now; - } - - pub fn advance_clock_to_next_delayed(&self) -> bool { - let next_due_time = self.state.lock().delayed.first().map(|(time, _)| *time); - if let Some(next_due_time) = next_due_time { - self.state.lock().time = next_due_time; - return true; - } - false - } - - pub fn simulate_random_delay(&self) -> impl 'static + Send + Future + use<> { - struct YieldNow { - pub(crate) count: usize, - } - - impl Future for YieldNow { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - if self.count > 0 { - self.count -= 1; - cx.waker().wake_by_ref(); - Poll::Pending - } else { - Poll::Ready(()) - } - } - } - - YieldNow { - count: self.state.lock().random.random_range(0..10), - } - } - - pub fn tick(&self, background_only: bool) -> bool { - let mut state = self.state.lock(); - - while let Some((deadline, _)) = state.delayed.first() { - if *deadline > state.time { - break; - } - let (_, runnable) = state.delayed.remove(0); - state.background.push(runnable); - } - - let foreground_len: usize = if background_only { - 0 - } else { - state - .foreground - .values() - .map(|runnables| runnables.len()) - .sum() - }; - let background_len = state.background.len(); - - let runnable; - let main_thread; - if foreground_len == 0 && background_len == 0 { - let deprioritized_background_len = state.deprioritized_background.len(); - if deprioritized_background_len == 0 { - return false; - } - let ix = state.random.random_range(0..deprioritized_background_len); - main_thread = false; - runnable = state.deprioritized_background.swap_remove(ix); - } else { - main_thread = state.random.random_ratio( - foreground_len as u32, - (foreground_len + background_len) as u32, - ); - if main_thread { - let state = &mut *state; - runnable = state - .foreground - .values_mut() - .filter(|runnables| !runnables.is_empty()) - .choose(&mut state.random) - .unwrap() - .pop_front() - .unwrap(); - } else { - let ix = state.random.random_range(0..background_len); - runnable = state.background.swap_remove(ix); - }; - }; - - let was_main_thread = state.is_main_thread; - state.is_main_thread = main_thread; - drop(state); - runnable.run(); - self.state.lock().is_main_thread = was_main_thread; - - true - } - - pub fn deprioritize(&self, task_label: TaskLabel) { - self.state - .lock() - .deprioritized_task_labels - .insert(task_label); - } - - pub fn run_until_parked(&self) { - while self.tick(false) {} - } - - pub fn parking_allowed(&self) -> bool { - self.state.lock().allow_parking - } - - pub fn allow_parking(&self) { - self.state.lock().allow_parking = true - } - - pub fn forbid_parking(&self) { - self.state.lock().allow_parking = false - } - - pub fn set_waiting_hint(&self, msg: Option) { - self.state.lock().waiting_hint = msg - } - - pub fn waiting_hint(&self) -> Option { - self.state.lock().waiting_hint.clone() - } - - pub fn start_waiting(&self) { - self.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved()); - } - - pub fn finish_waiting(&self) { - self.state.lock().waiting_backtrace.take(); - } - - pub fn waiting_backtrace(&self) -> Option { - self.state.lock().waiting_backtrace.take().map(|mut b| { - b.resolve(); - b - }) - } - - pub fn rng(&self) -> StdRng { - self.state.lock().random.clone() - } - - pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive) { - self.state.lock().block_on_ticks = range; - } - - pub fn gen_block_on_ticks(&self) -> usize { - let mut lock = self.state.lock(); - let block_on_ticks = lock.block_on_ticks.clone(); - lock.random.random_range(block_on_ticks) - } -} - -impl Clone for TestDispatcher { - fn clone(&self) -> Self { - let id = post_inc(&mut self.state.lock().next_id.0); - Self { - id: TestDispatcherId(id), - state: self.state.clone(), - parker: self.parker.clone(), - unparker: self.unparker.clone(), - } - } -} - -impl PlatformDispatcher for TestDispatcher { - fn now(&self) -> Instant { - let state = self.state.lock(); - state.start_time + state.time - } - - fn dispatch(&self, runnable: Runnable, label: Option) { - { - let mut state = self.state.lock(); - if label.is_some_and(|label| state.deprioritized_task_labels.contains(&label)) { - state.deprioritized_background.push(runnable); - } else { - state.background.push(runnable); - } - } - self.unparker.unpark(); - } - - fn dispatch_on_main_thread(&self, runnable: Runnable) { - self.state - .lock() - .foreground - .entry(self.id) - .or_default() - .push_back(runnable); - self.unparker.unpark(); - } - - fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) { - let mut state = self.state.lock(); - let next_time = state.time + duration; - let ix = match state.delayed.binary_search_by_key(&next_time, |e| e.0) { - Ok(ix) | Err(ix) => ix, - }; - state.delayed.insert(ix, (next_time, runnable)); - } - fn park(&self, _: Option) -> bool { - self.parker.lock().park(); - true - } - - fn unparker(&self) -> Unparker { - self.unparker.clone() - } - - fn as_test(&self) -> Option<&TestDispatcher> { - Some(self) - } -} diff --git a/crates/scheduler/src/executor.rs b/crates/scheduler/src/executor.rs index 1dab9764d0..730f6467f2 100644 --- a/crates/scheduler/src/executor.rs +++ b/crates/scheduler/src/executor.rs @@ -4,10 +4,13 @@ use futures::FutureExt as _; use std::{ future::Future, marker::PhantomData, + mem::ManuallyDrop, + panic::Location, pin::Pin, rc::Rc, sync::Arc, task::{Context, Poll}, + thread::{self, ThreadId}, time::Duration, }; @@ -19,6 +22,7 @@ pub struct ForegroundExecutor { } impl ForegroundExecutor { + #[track_caller] pub fn spawn(&self, future: F) -> Task where F: Future + 'static, @@ -26,7 +30,7 @@ impl ForegroundExecutor { { let session_id = self.session_id; let scheduler = Arc::clone(&self.scheduler); - let (runnable, task) = async_task::spawn_local(future, move |runnable| { + let (runnable, task) = spawn_local_with_source_location(future, move |runnable| { scheduler.schedule_foreground(session_id, runnable); }); runnable.schedule(); @@ -161,3 +165,68 @@ impl Future for Task { } } } + +/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics. +/// +/// Copy-modified from: +/// +#[track_caller] +fn spawn_local_with_source_location( + future: Fut, + schedule: S, +) -> (async_task::Runnable, async_task::Task) +where + Fut: Future + 'static, + Fut::Output: 'static, + S: async_task::Schedule + Send + Sync + 'static, +{ + #[inline] + fn thread_id() -> ThreadId { + std::thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + struct Checked { + id: ThreadId, + inner: ManuallyDrop, + location: &'static Location<'static>, + } + + impl Drop for Checked { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it. Task spawned at {}", + self.location + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl Future for Checked { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it. Task spawned at {}", + self.location + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + // Wrap the future into one that checks which thread it's on. + let future = Checked { + id: thread_id(), + inner: ManuallyDrop::new(future), + location: Location::caller(), + }; + + unsafe { async_task::spawn_unchecked(future, schedule) } +}