Remove waiting_hint/waiting_backtrace and debug logging from TestDispatcher
Further simplification of TestDispatcher: - Remove waiting_hint and waiting_backtrace (scheduler has PENDING_TRACES) - Remove start_waiting/finish_waiting/set_waiting_hint methods - Remove block_on_ticks (use rng directly for timeout ticks) - Remove all dispatcher_log! debug logging infrastructure - Remove set_block_on_ticks from BackgroundExecutor TestDispatcher is now 263 lines (down from 493). TestDispatcherState now only contains: - delayed: Vec<(Instant, RunnableVariant)> - for dispatch_after - is_main_thread: bool - for PlatformDispatcher API - unparkers: Vec<Unparker> - for block_on wakeup
This commit is contained in:
@@ -181,8 +181,6 @@ impl FakeServer {
|
||||
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
|
||||
self.executor.start_waiting();
|
||||
|
||||
let message = self
|
||||
.state
|
||||
.lock()
|
||||
@@ -192,7 +190,6 @@ impl FakeServer {
|
||||
.next()
|
||||
.await
|
||||
.context("other half hung up")?;
|
||||
self.executor.finish_waiting();
|
||||
let type_name = message.payload_type_name();
|
||||
let message = message.into_any();
|
||||
|
||||
|
||||
@@ -663,11 +663,9 @@ impl<V> Entity<V> {
|
||||
}
|
||||
}
|
||||
|
||||
cx.borrow().background_executor().start_waiting();
|
||||
rx.recv()
|
||||
.await
|
||||
.expect("view dropped with pending condition");
|
||||
cx.borrow().background_executor().finish_waiting();
|
||||
}
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{App, PlatformDispatcher, RunnableMeta, TaskTiming, profiler};
|
||||
use rand::Rng as _;
|
||||
|
||||
pub use scheduler::{Priority, RealtimePriority};
|
||||
|
||||
@@ -438,7 +439,7 @@ impl BackgroundExecutor {
|
||||
};
|
||||
|
||||
let mut max_ticks = if timeout.is_some() {
|
||||
dispatcher.gen_block_on_ticks()
|
||||
dispatcher.rng().lock().random_range(0..=1000)
|
||||
} else {
|
||||
usize::MAX
|
||||
};
|
||||
@@ -493,18 +494,7 @@ impl BackgroundExecutor {
|
||||
if advanced {
|
||||
continue;
|
||||
}
|
||||
let mut backtrace_message = String::new();
|
||||
let mut waiting_message = String::new();
|
||||
if let Some(backtrace) = dispatcher.waiting_backtrace() {
|
||||
backtrace_message =
|
||||
format!("\nbacktrace of waiting future:\n{:?}", backtrace);
|
||||
}
|
||||
if let Some(waiting_hint) = dispatcher.waiting_hint() {
|
||||
waiting_message = format!("\n waiting on: {}\n", waiting_hint);
|
||||
}
|
||||
panic!(
|
||||
"parked with nothing left to run{waiting_message}{backtrace_message}",
|
||||
)
|
||||
panic!("parked with nothing left to run")
|
||||
}
|
||||
|
||||
let unparker_count_before = dispatcher.unparker_count();
|
||||
@@ -604,17 +594,7 @@ impl BackgroundExecutor {
|
||||
Task(TaskState::Spawned(task))
|
||||
}
|
||||
|
||||
/// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn start_waiting(&self) {
|
||||
self.dispatcher.as_test().unwrap().start_waiting();
|
||||
}
|
||||
|
||||
/// in tests, removes the debugging data added by start_waiting
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn finish_waiting(&self) {
|
||||
self.dispatcher.as_test().unwrap().finish_waiting();
|
||||
}
|
||||
|
||||
/// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
@@ -657,11 +637,7 @@ impl BackgroundExecutor {
|
||||
self.dispatcher.as_test().unwrap().forbid_parking();
|
||||
}
|
||||
|
||||
/// adds detail to the "parked with nothing let to run" message.
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub fn set_waiting_hint(&self, msg: Option<String>) {
|
||||
self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
|
||||
}
|
||||
|
||||
|
||||
/// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
@@ -683,11 +659,7 @@ impl BackgroundExecutor {
|
||||
self.dispatcher.is_main_thread()
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
/// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
|
||||
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
|
||||
self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// ForegroundExecutor runs things on the main thread.
|
||||
|
||||
@@ -1,70 +1,21 @@
|
||||
use crate::{PlatformDispatcher, Priority, RunnableVariant, TaskLabel};
|
||||
use backtrace::Backtrace;
|
||||
use parking::Unparker;
|
||||
use parking_lot::Mutex;
|
||||
use rand::prelude::*;
|
||||
use scheduler::{Clock, Scheduler, SessionId, TestScheduler, TestSchedulerConfig};
|
||||
use std::{
|
||||
future::Future,
|
||||
ops::RangeInclusive,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
static DISPATCHER_LOG_ENABLED: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(false);
|
||||
static DISPATCHER_LOG_INITIALIZED: std::sync::atomic::AtomicBool =
|
||||
std::sync::atomic::AtomicBool::new(false);
|
||||
static DISPATCHER_EVENT_SEQ: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
/// Enable detailed dispatcher logging for debugging race conditions
|
||||
#[allow(dead_code)]
|
||||
pub fn enable_dispatcher_logging() {
|
||||
DISPATCHER_LOG_ENABLED.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Disable detailed dispatcher logging
|
||||
#[allow(dead_code)]
|
||||
pub fn disable_dispatcher_logging() {
|
||||
DISPATCHER_LOG_ENABLED.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
fn check_dispatcher_log_init() {
|
||||
if !DISPATCHER_LOG_INITIALIZED.load(Ordering::Relaxed) {
|
||||
let enabled = std::env::var("DEBUG_SCHEDULER")
|
||||
.map(|v| v == "1")
|
||||
.unwrap_or(false);
|
||||
DISPATCHER_LOG_ENABLED.store(enabled, Ordering::SeqCst);
|
||||
DISPATCHER_LOG_INITIALIZED.store(true, Ordering::SeqCst);
|
||||
if enabled {
|
||||
eprintln!("[DISP] Dispatcher debugging enabled via DEBUG_SCHEDULER=1");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! dispatcher_log {
|
||||
($($arg:tt)*) => {
|
||||
{
|
||||
check_dispatcher_log_init();
|
||||
if DISPATCHER_LOG_ENABLED.load(Ordering::Relaxed) {
|
||||
let seq = DISPATCHER_EVENT_SEQ.fetch_add(1, Ordering::SeqCst);
|
||||
eprintln!("[DISP {:>6}] [thread {:?}] {}", seq, std::thread::current().id(), format!($($arg)*));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// TestDispatcher provides deterministic async execution for tests.
|
||||
///
|
||||
/// This implementation delegates task scheduling to the scheduler crate's `TestScheduler`,
|
||||
/// which provides timing, clock, randomization, and task queue management. The dispatcher
|
||||
/// maintains only the delayed task queue (for dispatch_after) and state needed for
|
||||
/// GPUI-specific behavior like is_main_thread tracking.
|
||||
/// maintains only state needed for GPUI's `PlatformDispatcher` API compatibility.
|
||||
#[doc(hidden)]
|
||||
pub struct TestDispatcher {
|
||||
session_id: SessionId,
|
||||
@@ -75,9 +26,6 @@ pub struct TestDispatcher {
|
||||
struct TestDispatcherState {
|
||||
delayed: Vec<(Instant, RunnableVariant)>,
|
||||
is_main_thread: bool,
|
||||
waiting_hint: Option<String>,
|
||||
waiting_backtrace: Option<Backtrace>,
|
||||
block_on_ticks: RangeInclusive<usize>,
|
||||
unparkers: Vec<Unparker>,
|
||||
}
|
||||
|
||||
@@ -97,10 +45,7 @@ impl TestDispatcher {
|
||||
let state = TestDispatcherState {
|
||||
delayed: Vec::new(),
|
||||
is_main_thread: true,
|
||||
waiting_hint: None,
|
||||
waiting_backtrace: None,
|
||||
block_on_ticks: 0..=1000,
|
||||
unparkers: Default::default(),
|
||||
unparkers: Vec::new(),
|
||||
};
|
||||
|
||||
TestDispatcher {
|
||||
@@ -144,7 +89,7 @@ impl TestDispatcher {
|
||||
|
||||
pub fn simulate_random_delay(&self) -> impl 'static + Send + Future<Output = ()> + use<> {
|
||||
struct YieldNow {
|
||||
pub(crate) count: usize,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl Future for YieldNow {
|
||||
@@ -182,15 +127,8 @@ impl TestDispatcher {
|
||||
}
|
||||
|
||||
let (foreground_count, background_count) = self.scheduler.pending_task_counts();
|
||||
let delayed_count = self.state.lock().delayed.len();
|
||||
|
||||
if foreground_count == 0 && background_count == 0 {
|
||||
dispatcher_log!(
|
||||
"tick() -> false (no tasks) | fg={} bg={} delayed={}",
|
||||
foreground_count,
|
||||
background_count,
|
||||
delayed_count
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -210,16 +148,6 @@ impl TestDispatcher {
|
||||
let was_main_thread = self.state.lock().is_main_thread;
|
||||
self.state.lock().is_main_thread = run_foreground;
|
||||
|
||||
dispatcher_log!(
|
||||
"tick() running {} task | fg={} bg={} delayed={} main_thread={}",
|
||||
if run_foreground { "foreground" } else { "background" },
|
||||
foreground_count,
|
||||
background_count,
|
||||
delayed_count,
|
||||
run_foreground,
|
||||
);
|
||||
|
||||
// Run a task through the scheduler
|
||||
let did_work = if background_only {
|
||||
self.scheduler.tick_background_only()
|
||||
} else {
|
||||
@@ -247,66 +175,19 @@ impl TestDispatcher {
|
||||
self.scheduler.forbid_parking();
|
||||
}
|
||||
|
||||
pub fn set_waiting_hint(&self, msg: Option<String>) {
|
||||
self.state.lock().waiting_hint = msg;
|
||||
}
|
||||
|
||||
pub fn waiting_hint(&self) -> Option<String> {
|
||||
self.state.lock().waiting_hint.clone()
|
||||
}
|
||||
|
||||
pub fn start_waiting(&self) {
|
||||
self.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
|
||||
}
|
||||
|
||||
pub fn finish_waiting(&self) {
|
||||
self.state.lock().waiting_backtrace.take();
|
||||
}
|
||||
|
||||
pub fn waiting_backtrace(&self) -> Option<Backtrace> {
|
||||
self.state.lock().waiting_backtrace.as_ref().map(|trace| {
|
||||
let mut trace = trace.clone();
|
||||
trace.resolve();
|
||||
trace
|
||||
})
|
||||
}
|
||||
|
||||
pub fn rng(&self) -> Arc<parking_lot::Mutex<StdRng>> {
|
||||
self.scheduler.rng()
|
||||
}
|
||||
|
||||
pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
|
||||
self.state.lock().block_on_ticks = range;
|
||||
}
|
||||
|
||||
pub fn gen_block_on_ticks(&self) -> usize {
|
||||
let range = self.state.lock().block_on_ticks.clone();
|
||||
self.scheduler.rng().lock().random_range(range)
|
||||
}
|
||||
|
||||
pub fn unpark_all(&self) {
|
||||
let unparkers: Vec<_> = self.state.lock().unparkers.drain(..).collect();
|
||||
let count = unparkers.len();
|
||||
if count > 0 {
|
||||
dispatcher_log!("unpark_all() | unparking {} threads", count);
|
||||
}
|
||||
for unparker in unparkers {
|
||||
unparker.unpark();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_unparker(&self, unparker: Unparker) {
|
||||
let mut state = self.state.lock();
|
||||
let count_before = state.unparkers.len();
|
||||
state.unparkers.push(unparker);
|
||||
let (fg, bg) = self.scheduler.pending_task_counts();
|
||||
dispatcher_log!(
|
||||
"push_unparker() | unparkers: {} -> {} | fg={} bg={}",
|
||||
count_before,
|
||||
state.unparkers.len(),
|
||||
fg,
|
||||
bg
|
||||
);
|
||||
self.state.lock().unparkers.push(unparker);
|
||||
}
|
||||
|
||||
pub fn unparker_count(&self) -> usize {
|
||||
@@ -314,7 +195,6 @@ impl TestDispatcher {
|
||||
}
|
||||
|
||||
/// Returns a reference to the underlying TestScheduler.
|
||||
/// This can be used for advanced testing scenarios that need direct scheduler access.
|
||||
pub fn scheduler(&self) -> &Arc<TestScheduler> {
|
||||
&self.scheduler
|
||||
}
|
||||
@@ -349,33 +229,13 @@ impl PlatformDispatcher for TestDispatcher {
|
||||
}
|
||||
|
||||
fn dispatch(&self, runnable: RunnableVariant, _label: Option<TaskLabel>, priority: Priority) {
|
||||
let (fg, bg) = self.scheduler.pending_task_counts();
|
||||
let unparkers_before = self.state.lock().unparkers.len();
|
||||
|
||||
self.scheduler
|
||||
.schedule_background_with_priority(runnable, priority);
|
||||
|
||||
dispatcher_log!(
|
||||
"dispatch() | fg={} bg={} unparkers_at_dispatch={} (about to unpark_all)",
|
||||
fg,
|
||||
bg + 1,
|
||||
unparkers_before
|
||||
);
|
||||
self.unpark_all();
|
||||
}
|
||||
|
||||
fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
|
||||
let (fg, bg) = self.scheduler.pending_task_counts();
|
||||
let unparkers_before = self.state.lock().unparkers.len();
|
||||
|
||||
self.scheduler.schedule_foreground(self.session_id, runnable);
|
||||
|
||||
dispatcher_log!(
|
||||
"dispatch_on_main_thread() | fg={} bg={} unparkers_at_dispatch={} (about to unpark_all)",
|
||||
fg + 1,
|
||||
bg,
|
||||
unparkers_before
|
||||
);
|
||||
self.unpark_all();
|
||||
}
|
||||
|
||||
|
||||
@@ -135,7 +135,6 @@ impl TestPlatform {
|
||||
.new_path
|
||||
.pop_front()
|
||||
.expect("no pending new path prompt");
|
||||
self.background_executor().set_waiting_hint(None);
|
||||
tx.send(Ok(select_path(&path))).ok();
|
||||
}
|
||||
|
||||
@@ -147,7 +146,6 @@ impl TestPlatform {
|
||||
.multiple_choice
|
||||
.pop_front()
|
||||
.expect("no pending multiple choice prompt");
|
||||
self.background_executor().set_waiting_hint(None);
|
||||
let Some(ix) = prompt.answers.iter().position(|a| a == response) else {
|
||||
panic!(
|
||||
"PROMPT: {}\n{:?}\n{:?}\nCannot respond with {}",
|
||||
@@ -182,8 +180,6 @@ impl TestPlatform {
|
||||
) -> oneshot::Receiver<usize> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let answers: Vec<String> = answers.iter().map(|s| s.label().to_string()).collect();
|
||||
self.background_executor()
|
||||
.set_waiting_hint(Some(format!("PROMPT: {:?} {:?}", msg, detail)));
|
||||
self.prompts
|
||||
.borrow_mut()
|
||||
.multiple_choice
|
||||
@@ -348,8 +344,6 @@ impl Platform for TestPlatform {
|
||||
_suggested_name: Option<&str>,
|
||||
) -> oneshot::Receiver<Result<Option<std::path::PathBuf>>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.background_executor()
|
||||
.set_waiting_hint(Some(format!("PROMPT FOR PATH: {:?}", directory)));
|
||||
self.prompts
|
||||
.borrow_mut()
|
||||
.new_path
|
||||
|
||||
@@ -1735,13 +1735,11 @@ impl FakeLanguageServer {
|
||||
T: request::Request,
|
||||
T::Result: 'static + Send,
|
||||
{
|
||||
self.server.executor.start_waiting();
|
||||
self.server.request::<T>(params).await
|
||||
}
|
||||
|
||||
/// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
|
||||
pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
|
||||
self.server.executor.start_waiting();
|
||||
self.try_receive_notification::<T>().await.unwrap()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user