Compare commits

...

12 Commits

Author SHA1 Message Date
Nathan Sobo
98db674dba Modify test_background_task_with_foreground_wait to use oneshot channel for bg-fg communication 2025-08-31 22:06:05 -06:00
Nathan Sobo
b303a76922 Fix compile errors in scheduler crate: correct syntax error in tick method and Send trait bounds in spawn_labeled 2025-08-31 22:02:33 -06:00
Nathan Sobo
588237ce06 Add scheduler crate with unified TestScheduler and ForegroundExecutor
- Created new shared crate at crates/scheduler/
- Implemented minimal Scheduler trait with schedule_foreground and is_main_thread
- Added TestScheduler with deterministic execution and basic task queuing
- Added ForegroundExecutor wrapper for !Send futures
- Included test_basic_spawn_and_run passing test
- Follows Zed Rust guidelines: parking_lot Mutex, shadowing in scopes
2025-08-31 21:12:22 -06:00
Nathan Sobo
571693795e Update scheduler architecture plans with Executor and ForegroundExecutor wrappers 2025-08-31 19:42:23 -06:00
Nathan Sobo
d3d1be4fba Update TestScheduler design with SchedulerConfig, VariationSource enum, and SchedulerFuzzInput for fuzzing support 2025-08-31 19:06:23 -06:00
Nathan Sobo
1c1649dc61 Restructure unified scheduler architecture: separate GPUI/Cloud sections with comprehensive integration coverage
- Add dedicated GPUI Integration section with BackgroundExecutor/ForegroundExecutor
- Add dedicated Cloud Integration section with SimulatedExecutionContext session coordination
- Clarify session coordination approach: handled directly in Cloud's SimulatedExecutionContext
- Update migration strategies for both domains
- Balance coverage depth between GPUI and Cloud
- Change present-tense to future-tense planning language
- Reorganize platform backend implementations clearly
- Update architecture diagram to show domain-specific integration
2025-08-31 18:07:02 -06:00
Nathan Sobo
26d6d825a8 Update unified scheduler architecture: clarify platform implementations remain in GPUI, update to future tense language for planning document 2025-08-31 17:56:38 -06:00
Nathan Sobo
28bd412d62 Update unified scheduler plan with complete GPUI feature coverage
- Add comprehensive GPUI test methods (tick, run_until_parked, advance_clock, etc.)
- Add enhanced generic helpers for timer, blocking, and other GPUI APIs
- Update TestScheduler with all missing fields and methods
- Ensure object-safe trait supports all current BackgroundExecutor/ForegroundExecutor methods
- Confirm 100% GPUI API compatibility with existing code
2025-08-31 17:49:58 -06:00
Nathan Sobo
263eae363d Update unified scheduler architecture to use object-safe trait with generic spawn helpers
- Make Scheduler trait object-safe by replacing generic spawn<R>() with schedule(runnable)
- Add generic spawn helpers on dyn Scheduler to preserve Future-based API
- Move task state management internally to scheduler implementations
- Update GPUI integration to use trait objects with spawn helpers
- Enhance Cloud integration with automatic session association
- Resolve Runnable vs Future interface mismatch while maintaining backwards compatibility
2025-08-31 17:44:28 -06:00
Nathan Sobo
eac2ce7d84 Add comprehensive implementation reference section
- GPUI and Cloud file paths with specific types to update
- Migration points for each area
- Platform backend files for each OS
- Compatibility checklist for all features
- Step-by-step implementation roadmap
- Test files that will be impacted

Provides complete reference for implementing the unified scheduler architecture.
2025-08-31 17:35:49 -06:00
Nathan Sobo
de19c379f9 Update unified scheduler architecture plan with final layered design
- Clean separation: spawn_labeled on main trait (GPUI API), deprioritize only on TestScheduler (test infrastructure)
- Complete code listings with explanations
- Layered architecture: core Scheduler trait + TestScheduler + Cloud wrapper
- Production-safe: GPUI executors use trait objects, Cloud uses TestScheduler directly
- Full feature set: task coordination, session management, test determinism
2025-08-31 17:33:46 -06:00
Nathan Sobo
633345d04b docs: unified scheduler architecture design
- Define single Scheduler trait with low-level primitives and extension methods
- Centralize Task<T> and TaskId in scheduler crate
- Enable Send/non-Send future scheduling with main-thread safety
- Provide delayed scheduling via schedule_at for timer functionality
- Maintain GPUI/Cloud compatibility while unifying platform backends
- Establish foundation for deterministic testing and observability
2025-08-31 16:53:49 -06:00
6 changed files with 1226 additions and 0 deletions

12
Cargo.lock generated
View File

@@ -14349,6 +14349,18 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scheduler"
version = "0.1.0"
dependencies = [
"anyhow",
"async-task",
"futures 0.3.31",
"parking_lot",
"rand 0.8.5",
"rand_chacha 0.3.1",
]
[[package]]
name = "schema_generator"
version = "0.1.0"

View File

@@ -140,6 +140,7 @@ members = [
"crates/rpc",
"crates/rules_library",
"crates/schema_generator",
"crates/scheduler",
"crates/search",
"crates/semantic_index",
"crates/semantic_version",

View File

@@ -0,0 +1,18 @@
[package]
name = "scheduler"
version = "0.1.0"
edition = "2021"
[lib]
path = "src/scheduler.rs"
[dependencies]
anyhow = "1.0"
async-task = "4.5"
futures = "0.3"
rand = { version = "0.8", features = ["small_rng"] }
rand_chacha = "0.3"
parking_lot = "0.12"
[features]
test-support = []

View File

@@ -0,0 +1,215 @@
use anyhow::Result;
use async_task::Runnable;
use parking_lot::Mutex;
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::any::Any;
use std::collections::VecDeque;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use futures::channel::oneshot;
use futures::executor;
use std::thread::{self, ThreadId};
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct TaskLabel(usize);
pub trait Scheduler: Send + Sync + Any {
fn schedule(&self, runnable: Runnable, label: Option<TaskLabel>);
fn schedule_foreground(&self, runnable: Runnable, label: Option<TaskLabel>);
fn is_main_thread(&self) -> bool;
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskId(usize);
pub struct Task<R>(async_task::Task<R>);
impl<R> Task<R> {
pub fn id(&self) -> TaskId {
TaskId(0) // Placeholder
}
}
impl Default for TaskLabel {
fn default() -> Self {
TaskLabel(0)
}
}
pub struct SchedulerConfig {
pub randomize_order: bool,
pub seed: u64,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
randomize_order: true,
seed: 0,
}
}
}
pub struct TestScheduler {
inner: Mutex<TestSchedulerInner>,
}
struct TestSchedulerInner {
rng: ChaCha8Rng,
foreground_queue: VecDeque<Runnable>,
creation_thread_id: ThreadId,
}
impl TestScheduler {
pub fn new(config: SchedulerConfig) -> Self {
Self {
inner: Mutex::new(TestSchedulerInner {
rng: ChaCha8Rng::seed_from_u64(config.seed),
foreground_queue: VecDeque::new(),
creation_thread_id: thread::current().id(),
}),
}
}
pub fn tick(&self, background_only: bool) -> bool {
let mut inner = self.inner.lock();
if !background_only {
if let Some(runnable) = inner.foreground_queue.pop_front() {
drop(inner); // Unlock while running
runnable.run();
return true;
}
}
false
}
pub fn run(&self) {
while self.tick(false) {}
}
}
impl Scheduler for TestScheduler {
fn schedule(&self, runnable: Runnable, _label: Option<TaskLabel>) {
runnable.run();
}
fn schedule_foreground(&self, runnable: Runnable, _label: Option<TaskLabel>) {
self.inner.lock().foreground_queue.push_back(runnable);
}
fn is_main_thread(&self) -> bool {
thread::current().id() == self.inner.lock().creation_thread_id
}
}
pub struct ForegroundExecutor {
scheduler: Arc<dyn Scheduler>,
_phantom: PhantomData<()>,
}
impl ForegroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
Ok(Self {
scheduler,
_phantom: PhantomData,
})
}
pub fn spawn<R: 'static>(&self, future: impl Future<Output = R> + 'static) -> Task<R> {
let scheduler = self.scheduler.clone();
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
scheduler.schedule_foreground(runnable, None);
});
runnable.schedule();
Task(task)
}
pub fn spawn_labeled<R: 'static>(
&self,
future: impl Future<Output = R> + 'static,
label: TaskLabel,
) -> Task<R> {
let scheduler = self.scheduler.clone();
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
scheduler.schedule_foreground(runnable, Some(label));
});
runnable.schedule();
Task(task)
}
}
pub struct BackgroundExecutor {
scheduler: Arc<dyn Scheduler>,
}
impl BackgroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
Ok(Self { scheduler })
}
pub fn spawn<R: 'static + Send>(
&self,
future: impl Future<Output = R> + Send + 'static,
) -> Task<R> {
let scheduler = self.scheduler.clone();
let (runnable, task) = async_task::spawn(future, move |runnable| {
scheduler.schedule_foreground(runnable, None);
});
runnable.schedule();
Task(task)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[test]
fn test_basic_spawn_and_run() {
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
let executor = ForegroundExecutor::new(scheduler.clone()).unwrap();
let flag = Arc::new(AtomicBool::new(false));
assert!(!flag.load(Ordering::SeqCst));
let _task = executor.spawn({
let flag = flag.clone();
async move {
flag.store(true, Ordering::SeqCst);
}
});
assert!(!flag.load(Ordering::SeqCst));
scheduler.run();
assert!(flag.load(Ordering::SeqCst));
}
#[test]
fn test_background_task_with_foreground_wait() {
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
// Create a oneshot channel to send data from background to foreground
let (tx, rx) = oneshot::channel();
// Spawn background task that sends 42
let bg_executor = BackgroundExecutor::new(scheduler.clone()).unwrap();
let _background_task = bg_executor.spawn(async move {
tx.send(42).unwrap();
});
// Run all tasks
scheduler.run();
// Block on receiving the value from the background task
let received = executor::block_on(rx).unwrap();
// Assert on the result
assert_eq!(received, 42);
}
}

View File

@@ -0,0 +1,520 @@
# TestScheduler Design Details
This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. **Updates incorporate Executor and ForegroundExecutor wrappers around Arc<dyn Scheduler>, with ForegroundExecutor using PhantomData<Rc<()>> for !Send and panicking if not on the creation thread, applied to both GPUI and Cloud for consistency and simplicity.**
## Overview
The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). **ForegroundExecutor is now used in Cloud for single-threaded simplicity, avoiding Send requirements on futures.**
- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
There is **no separate TestScheduler trait**—all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. **Executors wrap Arc<dyn Scheduler>, with ForegroundExecutor enforcing thread safety via phantom Rc and creation-thread checks.**
## Design Principles
- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features.
- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
- **Determinism**: Always seeded with configurable randomization.
- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor now uses Executor and ForegroundExecutor).
- **Thread Safety Enforcement**: ForegroundExecutor uses phantom Rc for !Send and checks against creation thread for main-thread isolation.
## Complete TestScheduler Implementation
```rust
use std::collections::{HashMap, HashSet, VecDeque};
use std::future::Future;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use async_task::Runnable;
use chrono::{DateTime, Utc};
use futures::channel::oneshot;
use parking_lot::{Mutex as ParkingMutex, MutexGuard};
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
// Core types (shared with main crate)
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskId(usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskLabel(usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
pub enum ThreadTarget { Main, Background }
pub enum VariationSource {
/// Seeded randomness for reproducible probabilistic scheduling.
Seeded(u64),
/// Fuzzed inputs for deterministic exploration of variations.
Fuzzed(SchedulerFuzzInput),
}
#[derive(Clone, Debug)]
pub struct SchedulerConfig {
/// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
pub randomize_order: bool,
/// Max steps before panic (for deadlock detection).
pub max_steps: usize,
/// Whether to log operations (for debugging).
pub log_operations: bool,
/// The source of non-deterministic decisions (mutually exclusive modes).
pub variation_source: VariationSource,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
randomize_order: true,
max_steps: 10000,
log_operations: false,
variation_source: VariationSource::Seeded(0),
}
}
}
impl SchedulerConfig {
/// Create a seeded config (randomization enabled by default).
pub fn seeded(seed: u64) -> Self {
Self {
variation_source: VariationSource::Seeded(seed),
..Default::default()
}
}
/// Create a fuzzed config (randomization enabled by default).
pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
Self {
variation_source: VariationSource::Fuzzed(fuzz_inputs),
..Default::default()
}
}
/// For isolation: Disable randomization for deterministic testing.
pub fn deterministic(seed: u64) -> Self {
Self {
randomize_order: false,
variation_source: VariationSource::Seeded(seed),
..Default::default()
}
}
}
#[derive(Arbitrary, Debug, Clone)]
pub struct SchedulerFuzzInput {
queue_selections: Vec<u8>,
task_indices: Vec<u32>,
delay_bools: Vec<bool>,
block_ticks: Vec<usize>,
}
pub struct Task<T> {
id: TaskId,
rx: oneshot::Receiver<T>,
scheduler: Arc<TestScheduler>,
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.rx.try_recv() {
Ok(Some(val)) => Poll::Ready(val),
_ => { cx.waker().wake_by_ref(); Poll::Pending }
}
}
}
// Internal state
struct TaskInfo {
label: Option<TaskLabel>,
session_id: Option<SessionId>,
waker: Option<Waker>,
future: Option<Pin<Box<dyn Future<Output = ()>>>>,
state: TaskState,
delay_range: Option<(Instant, Instant)>,
}
#[derive(PartialEq)]
enum TaskState { Pending, Running, Completed }
struct WorkerSession {
spawned_tasks: HashSet<TaskId>,
wait_until_tasks: HashSet<TaskId>,
}
struct SchedulerState {
rng: ChaCha8Rng,
randomize_order: bool,
max_steps: usize,
current_time: Instant,
next_task_id: AtomicUsize,
tasks: HashMap<TaskId, TaskInfo>,
sessions: HashMap<SessionId, WorkerSession>,
current_session: Option<SessionId>,
ready_queue: VecDeque<TaskId>,
deprioritized_queue: VecDeque<TaskId>,
main_thread_queue: VecDeque<TaskId>,
delayed: Vec<(Instant, TaskId)>,
deprioritized_labels: HashSet<TaskLabel>,
block_tick_range: std::ops::RangeInclusive<usize>,
parker: parking_lot::Parker,
unparker: parking_lot::Unparker,
parking_allowed: AtomicBool,
execution_history: Vec<String>,
fuzz_inputs: Option<FuzzedSchedulerInputs>,
creation_thread_id: thread::ThreadId, // Added for thread safety checks
}
// Concrete implementation
pub struct TestScheduler {
state: Arc<Mutex<SchedulerState>>,
}
impl TestScheduler {
/// Primary constructor: Create a scheduler from full configuration.
pub fn new(config: SchedulerConfig) -> Arc<Self> {
let (parker, unparker) = parking_lot::pair();
let (rng, fuzz_inputs) = match config.variation_source {
VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None),
VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)),
};
let state = SchedulerState {
rng,
randomize_order: config.randomize_order,
max_steps: config.max_steps,
current_time: Instant::now(),
next_task_id: AtomicUsize::new(1),
tasks: HashMap::new(),
sessions: HashMap::new(),
current_session: None,
ready_queue: VecDeque::new(),
deprioritized_queue: VecDeque::new(),
main_thread_queue: VecDeque::new(),
delayed: Vec::new(),
deprioritized_labels: HashSet::new(),
block_tick_range: 0..=1000,
parker,
unparker,
parking_allowed: AtomicBool::new(false),
execution_history: Vec::new(),
fuzz_inputs,
creation_thread_id: thread::current().id(), // Capture creation thread
};
Arc::new(Self { state: Arc::new(Mutex::new(state)) })
}
// Added for ForegroundExecutor thread checks
pub fn assert_main_thread(&self) {
let state = self.state.lock().unwrap();
if thread::current().id() != state.creation_thread_id {
panic!("ForegroundExecutor method called from wrong thread");
}
}
/// Convenience helper: Create a seeded scheduler (randomization enabled by default).
pub fn from_seed(seed: u64) -> Arc<Self> {
Self::new(SchedulerConfig::seeded(seed))
}
/// Convenience helper: Create a scheduler driven by fuzzed inputs.
/// Use for fuzzing with Bolero.
pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
}
// Test-specific methods (direct on impl, no trait)
pub fn assign_task_id(&self) -> TaskId {
TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
}
pub fn deprioritize(&self, label: TaskLabel) {
let mut state = self.state.lock().unwrap();
state.deprioritized_labels.insert(label);
}
pub fn is_task_running(&self, task_id: TaskId) -> bool {
let state = self.state.lock().unwrap();
state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
}
pub fn tick(&self, background_only: bool) -> bool {
self.tick_internal(background_only)
}
fn tick_internal(&self, background_only: bool) -> bool {
// Process delays first (drop lock before polling)
{
let mut state = self.state.lock().unwrap();
state.delayed.retain(|&(time, task_id)| {
if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
state.ready_queue.push_back(task_id);
false
} else { true }
});
} // Lock dropped here
// Select and poll task without lock held
let task_to_poll = {
let mut state = self.state.lock().unwrap();
let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
if !background_only { queues.insert(0, &mut state.main_thread_queue); }
let mut available: Vec<usize> = queues.iter().enumerate()
.filter(|&(_, q)| !q.is_empty())
.map(|(i, _)| i)
.collect();
if available.is_empty() { return false; }
if state.randomize_order { available.shuffle(&mut state.rng); }
let queue_ix = available[0];
let task_id = queues[queue_ix].pop_front().unwrap();
Some(task_id)
}; // Lock dropped here
// Poll the task's future outside the lock
let poll_result = {
let mut state = self.state.lock().unwrap();
if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
task.state = TaskState::Running;
if let Some(fut) = task.future.as_mut() {
if let Some(waker) = task.waker.as_ref() {
let mut context = Context::from_waker(waker);
fut.as_mut().poll(&mut context)
} else {
Poll::Pending
}
} else {
Poll::Pending
}
} else {
Poll::Pending
}
}; // Lock dropped here
// Update task state after polling
if poll_result.is_ready() {
let mut state = self.state.lock().unwrap();
if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
task.state = TaskState::Completed;
state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
}
}
true
}
pub fn advance_clock(&self, duration: Duration) {
let mut state = self.state.lock().unwrap();
state.current_time += duration;
}
pub fn run_until_parked(&self) {
while self.tick(false) {}
}
// Cloud session methods
pub fn create_session(&self) -> SessionId {
let mut state = self.state.lock().unwrap();
let id = SessionId(state.sessions.len());
state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
id
}
pub fn set_current_session(&self, session_id: Option<SessionId>) {
let mut state = self.state.lock().unwrap();
state.current_session = session_id;
}
pub fn get_current_session(&self) -> Option<SessionId> {
self.state.lock().unwrap().current_session
}
pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
let mut state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&session_id) {
session.spawned_tasks.insert(task_id);
}
}
pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
let mut state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&session_id) {
session.wait_until_tasks.insert(task_id);
}
}
pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
let state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get(&session_id) {
let dangling: Vec<_> = session.spawned_tasks.iter()
.filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
.filter(|&&tid| !session.wait_until_tasks.contains(&tid))
.cloned()
.collect();
if !dangling.is_empty() {
return Err(anyhow!("{} dangling tasks", dangling.len()));
}
}
Ok(())
}
// Other test methods (e.g., GPUI block simulation)
pub fn gen_block_on_ticks(&self) -> usize {
let state = self.state.lock().unwrap();
state.rng.gen_range(state.block_on_ticks_range.clone())
}
}
```
## GPUI Usage Example
GPUI wraps the TestScheduler using Executor and ForegroundExecutor:
```rust
use std::marker::PhantomData;
use std::rc::Rc;
// Generic Executor for background tasks (Send futures)
pub struct Executor {
scheduler: Arc<dyn Scheduler>,
}
impl Executor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where R: Send + 'static {
// Delegate to scheduler via downcast for test methods if applicable
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
// Use test_sched methods
}
self.scheduler.spawn(future)
}
pub fn spawn_labeled<R>(
&self,
label: TaskLabel,
future: impl Future<Output = R> + Send + 'static
) -> Task<R>
where R: Send + 'static {
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.deprioritize(label);
}
self.scheduler.spawn_labeled(label, future)
}
pub fn deprioritize(&self, label: TaskLabel) {
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.deprioritize(label);
}
}
pub fn timer(&self, duration: Duration) -> Task<()> {
self.scheduler.timer(duration)
}
pub fn tick(&self) -> Option<bool> {
self.scheduler.as_any().downcast_ref::<TestScheduler>().map(|ts| ts.tick(false))
}
}
// ForegroundExecutor for main-thread tasks (!Send futures, thread checks)
pub struct ForegroundExecutor {
executor: Executor,
_phantom: PhantomData<Rc<()>>, // Enforces !Send
}
impl !Send for ForegroundExecutor {} // Explicitly !Send
impl ForegroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
let executor = Executor::new(scheduler);
// Check thread immediately via scheduler
if let Some(test_sched) = executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.assert_main_thread();
} else {
// Production: assume created on main thread
}
Ok(Self { executor, _phantom: PhantomData })
}
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
where R: 'static {
// Assert thread before delegating
if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.assert_main_thread();
}
self.executor.scheduler.spawn_foreground(future)
}
pub fn timer(&self, duration: Duration) -> Task<()> {
if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.assert_main_thread();
}
self.executor.scheduler.timer(duration)
}
// Other methods mirror Executor but with thread checks
}
```
## Cloud Usage Example
Cloud wraps using ForegroundExecutor for single-threaded simplicity (no Send futures required):
```rust
impl SimulatedExecutionContext {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
let fg_executor = ForegroundExecutor::new(scheduler)?; // Use ForegroundExecutor for thread safety and simplicity
Self {
executor: fg_executor,
session_counter: AtomicUsize::new(0),
sessions: Mutex::new(HashMap::new()),
current_session: Mutex::new(None),
}
}
pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
let task = self.executor.spawn(async move { future.await })?;
// Direct use of TestScheduler methods via downcast from executor
if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
if let Some(session_id) = test_sched.get_current_session() {
test_sched.track_task_for_session(task.id(), session_id);
test_sched.add_wait_until_task(session_id, task.id());
}
}
Ok(())
}
}
```
## Key Dependencies and Assumptions
- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`.
- **async_task**: For Runnable wrapping.
- **rand_chacha**: For seeded RNG.
- **futures**: For channels.
- **chrono**: For time ranges (optional).
- **anyhow**: For errors.
- **std::thread**: For thread ID comparison.
The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct, now wrapped by Executors for better encapsulation and thread safety.

View File

@@ -0,0 +1,460 @@
┌─────────────────────────────────────────┐
│ Shared Crate │
├─────────────────────────────────────────┤
│ Scheduler trait: │
│ - Core object-safe interface │
│ │
│ TestScheduler: │
│ - Should implement Scheduler + test features │
│ - deprioritize() - test-only method │
│ - spawn_labeled() - labels for testing │
│ - Task lifecycle tracking │
│ - creation_thread_id for Foreground checks│
│ │
│ Executor wrappers: │
│ - Executor: Wraps Arc<dyn Scheduler>, Send futures│
│ - ForegroundExecutor: Wraps Arc<dyn Scheduler>, !Send, thread checks│
└─────────────────────────────────────────┘
┌─────────┼─────────┐
│ │ │
┌─────────┴────┐ ┌─┴─────────┴────┐
│ GPUI │ │ Cloud │
│ Uses Executor│ │ ForegroundExec │
│ + Foreground │ │ for single-thrd│
│ + TestScheduler│ │ + TestScheduler│
└──────────────┘ └─────────────────┘
```
## GPUI Integration
### Platform Scheduler Implementations in GPUI
Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
**Rationale**: Platform implementations are substantial and deeply integrated with:
- Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
- GPUI's event loop integration (main thread messaging)
- Platform-specific performance optimizations
The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.**
### BackgroundExecutor Integration
GPUI's executors now use wrappers:
```rust
// crates/gpui/src/executor.rs
pub struct BackgroundExecutor {
executor: Executor, // Generic wrapper for background tasks (Send futures)
}
impl BackgroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { executor: Executor::new(scheduler) }
}
// Core spawning methods via wrapper delegation
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where R: Send + 'static {
self.executor.spawn(future)
}
pub fn spawn_labeled<R>(
&self,
label: TaskLabel,
future: impl Future<Output = R> + Send + 'static
) -> Task<R>
where R: Send + 'static {
self.executor.spawn_labeled(label, future)
}
// Timer functionality via wrapper
pub fn timer(&self, duration: Duration) -> Task<()> {
self.executor.timer(duration)
}
// Test-specific methods via downcast in wrapper
pub fn deprioritize(&self, label: TaskLabel) {
self.executor.deprioritize(label);
}
pub fn tick(&self) -> Option<bool> {
self.executor.tick()
}
}
```
### ForegroundExecutor Integration
GPUI's foreground executor enforces main-thread usage:
```rust
// crates/gpui/src/executor.rs
pub struct ForegroundExecutor {
executor: Executor, // Underlying executor for delegation
_phantom: PhantomData<Rc<()>>, // Enforces !Send
creation_thread_id: ThreadId, // Stored for checks
}
impl !Send for ForegroundExecutor {} // Explicitly !Send
impl ForegroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
let creation_thread_id = thread::current().id();
// Delegate creation to underlying scheduler
let _ = Executor::new(scheduler.clone());
Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id })
}
// Core spawning for main thread (non-Send futures)
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
where R: 'static {
if thread::current().id() != self.creation_thread_id {
panic!("ForegroundExecutor called off main thread");
}
// Delegate to scheduler.spawn_foreground via wrapper
self.executor.scheduler.spawn_foreground(future)
}
// Timer and test methods same as BackgroundExecutor but with thread checks
pub fn timer(&self, duration: Duration) -> Task<()> {
if thread::current().id() != self.creation_thread_id {
panic!("ForegroundExecutor called off main thread");
}
self.executor.timer(duration)
}
}
```
## Cloud Integration
### Session Coordination in SimulatedExecutionContext
Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. **Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.**
```rust
// crates/platform_simulator/src/platform.rs
pub struct SimulatedExecutionContext {
fg_executor: ForegroundExecutor, // Single-threaded wrapper for simplicity
session_counter: AtomicUsize,
sessions: Mutex<HashMap<SessionId, WorkerSession>>,
current_session: Mutex<Option<SessionId>>,
}
#[async_trait(?Send)]
impl PlatformRuntime for SimulatedExecutionContext {
async fn delay(&self, duration: Duration) {
// Use wrapper's timer for delay
self.fg_executor.timer(duration).await;
}
}
```
### Wait Until Implementation
Session coordination integrated directly with wrapper's task scheduling:
```rust
#[async_trait(?Send)]
impl ExecutionContext for SimulatedExecutionContext {
fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
// 1. Spawn using wrapper (no Send required)
let task = self.fg_executor.spawn(async move { future.await })?;
// 2. Register with session coordination via downcast
if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
if let Some(session_id) = test_sched.get_current_session() {
test_sched.track_task_for_session(task.id(), session_id);
test_sched.add_wait_until_task(session_id, task.id());
}
}
Ok(())
}
async fn pass_through(&self) -> Result<()> {
// Use wrapper's timer for delay
self.fg_executor.timer(Duration::from_millis(10)).await;
Ok(())
}
}
```
### Session Coordination Methods
Core session operations handled within SimulatedExecutionContext:
```rust
impl SimulatedExecutionContext {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
let fg_executor = ForegroundExecutor::new(scheduler)?;
Ok(Self {
fg_executor,
session_counter: AtomicUsize::new(0),
sessions: Mutex::new(HashMap::new()),
current_session: Mutex::new(None),
})
}
pub fn create_session(&self) -> SessionId {
let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
let session_id = SessionId(session_counter);
self.sessions.lock().insert(session_id, WorkerSession {
spawned_tasks: HashSet::new(),
wait_until_task_ids: HashSet::new(),
});
session_id
}
pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
where
F: FnOnce() -> R,
{
{
let mut current = self.current_session.lock();
*current = Some(session_id);
}
let result = f();
{
let mut current = self.current_session.lock();
*current = None;
}
result
}
pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
let sessions = self.sessions.lock();
if let Some(session) = sessions.get(&session_id) {
// Check running tasks using wrapper's TestScheduler access
if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
let dangling_tasks: Vec<TaskId> = session
.spawned_tasks
.iter()
.filter(|&&task_id| test_sched.is_task_running(task_id))
.copied()
.collect();
// Cloud-specific permission check
let unauthorized: Vec<_> = dangling_tasks
.into_iter()
.filter(|task_id| !session.wait_until_task_ids.contains(task_id))
.collect();
if !unauthorized.is_empty() {
return Err(platform_api::WorkerError::Other(anyhow!(
"Session cleanup failed: {} unauthorized tasks still running",
unauthorized.len()
)));
}
}
}
Ok(())
}
pub fn set_current_session(&self, session_id: SessionId) {
*self.current_session.lock() = Some(session_id);
}
pub fn get_current_session(&self) -> Option<SessionId> {
*self.current_session.lock()
}
}
```
### Cloud-Specific Data Structures
Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers.
## Scheduler Trait Definition
```rust
pub trait Scheduler: Send + Sync {
/// Schedule a runnable to be executed (object-safe core functionality)
fn schedule(&self, runnable: Runnable);
/// Schedule a runnable with label for test tracking
fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
/// Schedule a runnable on the main thread (optional, defaults to panic)
fn schedule_foreground(&self, runnable: Runnable) {
panic!("schedule_foreground not supported by this scheduler");
}
/// Schedule a runnable after a delay (object-safe for timers)
fn schedule_after(&self, duration: Duration, runnable: Runnable);
/// Platform integration methods
fn park(&self, timeout: Option<Duration>) -> bool;
fn unparker(&self) -> Unparker;
fn is_main_thread(&self) -> bool;
fn now(&self) -> Instant;
}
## TestScheduler (Concrete Implementation)
```rust
pub struct TestScheduler {
inner: RefCell<TestSchedulerInner>,
}
struct TestSchedulerInner {
tasks: HashMap<TaskId, TaskState>,
task_labels: HashMap<TaskId, TaskLabel>,
deprioritized_labels: HashSet<TaskLabel>,
deprioritized_queue: VecDeque<(Runnable, TaskId)>,
main_thread_queue: VecDeque<Runnable>,
delayed: Vec<(Instant, Runnable)>,
parker: Parker,
is_main_thread: bool,
now: Instant,
next_task_id: AtomicUsize,
rng: StdRng,
waiting_tasks: HashSet<TaskId>,
parking_allowed: bool,
waiting_hint: Option<String>,
block_tick_range: std::ops::RangeInclusive<usize>,
creation_thread_id: ThreadId, // Added for wrapper checks
}
impl Scheduler for TestScheduler {
fn schedule(&self, runnable: Runnable) {
// Implementation as before
}
fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
// Implementation as before, with deprioritization
}
fn schedule_foreground(&self, runnable: Runnable) {
assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread");
// Implementation as before
}
// Other trait methods unchanged
}
impl TestScheduler {
// Test-specific methods (NOT on main trait)
pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ }
pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ }
pub fn tick(&self) -> bool { /* implementation */ }
pub fn run_until_parked(&self) { /* implementation */ }
pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
pub fn assert_main_thread(&self) { /* implementation */ }
}
```
## Generic Spawn Helpers
Generic spawn methods implemented for `dyn Scheduler`, now called by wrappers.
## Migration Strategy
### Phase 1: Core Infrastructure
1. Define `Scheduler` trait (core methods only)
2. Implement `TestScheduler` with thread ID tracking
3. Add wrapper structs `Executor` and `ForegroundExecutor`
4. Make existing GPUI platform dispatchers implement `Scheduler` trait
5. Add `as_any()` to `Scheduler` for downcasting
### Phase 2: GPUI Migration
1. Update GPUI executors to use `Executor` and `ForegroundExecutor` wrappers
2. Handle downcasting in wrappers for test features
3. Preserve all existing GPUI functionality
4. Test deployments use TestScheduler, production uses minimal schedulers
### Phase 3: Cloud Integration
1. Update `SimulatedExecutionContext` to use `ForegroundExecutor`
2. Move session coordination logic into `SimulatedExecutionContext`
3. Integrate `wait_until()` with wrapper scheduling
4. Use TestScheduler features for session validation via downcast
5. Preserve all existing Cloud platform APIs
### Phase 4: Testing & Validation
1. GPUI tests work with new architecture
2. Cloud session behavior preserved (single-threaded)
3. Production efficiency maintained
4. Both domains benefit from unified test infrastructure
## Platform Backend Files
### GPUI Backends
- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` implements `Scheduler`
- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` implements `Scheduler`
- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` implements `Scheduler`
- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher``TestScheduler` (moved to shared crate)
### Cloud Backends
- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` uses `ForegroundExecutor`
- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using `ForegroundExecutor`
## Compatibility Checklist
## Complete GPUI + Cloud Feature Coverage ✅
### GPUI Compatibility
-`spawn()``Executor::spawn()` or `ForegroundExecutor::spawn()`
-`spawn_labeled(label)` → Wrappers delegate to `dyn Scheduler::spawn_labeled()`
-`timer(duration)` → Wrappers delegate to `dyn Scheduler::timer()`
-`block(future)` → Wrappers handle with parking
-`block_with_timeout(future, timeout)` → Wrappers handle
-`now()``scheduler.now()` (direct trait method)
-`is_main_thread()``scheduler.is_main_thread()` (direct trait method)
-`num_cpus()` → Generic helper on wrappers
-`deprioritize(label)` → Downcast in wrappers, then TestScheduler::deprioritize()
-`tick()` → Downcast in wrappers, then TestScheduler::tick()
-`run_until_parked()` → Downcast in wrappers, then TestScheduler::run_until_parked()
-`advance_clock(duration)` → Downcast in wrappers, then TestScheduler::advance_clock()
-`simulate_random_delay()` → Downcast in wrappers, then TestScheduler::simulate_random_delay()
-`BackgroundExecutor` → Uses `Executor` wrapper
### Cloud Compatibility
-**Session Coordination**: `ExecutionContext.wait_until()` via `ForegroundExecutor`
-**Task Lifecycle**: Uses wrapper's TestScheduler access for validation
-**Worker Management**: Session context and cleanup validation
-**Background Tasks**: Explicit permission system for long-running work
-**Deterministic Testing**: Full TestScheduler integration with session tracking
-**Platform Runtime**: `PlatformRuntime.delay()` via wrapper timer
-**Session Validation**: Dangling task detection with proper error reporting
-**Auto-Association**: Tasks automatically linked to sessions during spawn
### Unified Benefits
-**Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination
-**Unified Task Tracking**: Both domains use TestScheduler via wrappers for validation
-**Composability**: Session coordination built on unified scheduling primitives
-**Domain-Specific**: Each domain handles its coordination concerns appropriately
-**Test Infrastructure**: Shared deterministic testing capabilities
-**Production Ready**: Both domains can use minimal platform schedulers
-**Extensible**: New coordination patterns can be added without shared crate changes
-**Thread Safety**: ForegroundExecutor enforces main-thread use across domains
## Implementation Notes
### Key Design Decisions
1. **GPUI**: Uses `Executor` for background (Send), `ForegroundExecutor` for main-thread (!Send)
2. **Cloud**: Uses `ForegroundExecutor` for single-threaded simplicity (no Send required on futures)
3. **Shared**: Core scheduling primitives + wrappers for delegation and safety
4. **Integration**: Both domains use wrappers with consistent API
### Migration Considerations
- **Zero Breaking Changes**: Existing APIs preserved via wrappers
- **Gradual Migration**: Can migrate GPUI and Cloud independently
- **Test Preservation**: All existing test functionality maintained
- **Performance**: Minimal overhead from trait objects in production
- **Cloud Simplification**: ForegroundExecutor allows non-Send futures in single-threaded context
This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers.