Compare commits
12 Commits
test-winge
...
scheduler
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98db674dba | ||
|
|
b303a76922 | ||
|
|
588237ce06 | ||
|
|
571693795e | ||
|
|
d3d1be4fba | ||
|
|
1c1649dc61 | ||
|
|
26d6d825a8 | ||
|
|
28bd412d62 | ||
|
|
263eae363d | ||
|
|
eac2ce7d84 | ||
|
|
de19c379f9 | ||
|
|
633345d04b |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -14349,6 +14349,18 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduler"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-task",
|
||||
"futures 0.3.31",
|
||||
"parking_lot",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schema_generator"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -140,6 +140,7 @@ members = [
|
||||
"crates/rpc",
|
||||
"crates/rules_library",
|
||||
"crates/schema_generator",
|
||||
"crates/scheduler",
|
||||
"crates/search",
|
||||
"crates/semantic_index",
|
||||
"crates/semantic_version",
|
||||
|
||||
18
crates/scheduler/Cargo.toml
Normal file
18
crates/scheduler/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "scheduler"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
path = "src/scheduler.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
async-task = "4.5"
|
||||
futures = "0.3"
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
rand_chacha = "0.3"
|
||||
parking_lot = "0.12"
|
||||
|
||||
[features]
|
||||
test-support = []
|
||||
215
crates/scheduler/src/scheduler.rs
Normal file
215
crates/scheduler/src/scheduler.rs
Normal file
@@ -0,0 +1,215 @@
|
||||
use anyhow::Result;
|
||||
use async_task::Runnable;
|
||||
use parking_lot::Mutex;
|
||||
use rand_chacha::rand_core::SeedableRng;
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use futures::executor;
|
||||
use std::thread::{self, ThreadId};
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct TaskLabel(usize);
|
||||
|
||||
pub trait Scheduler: Send + Sync + Any {
|
||||
fn schedule(&self, runnable: Runnable, label: Option<TaskLabel>);
|
||||
fn schedule_foreground(&self, runnable: Runnable, label: Option<TaskLabel>);
|
||||
fn is_main_thread(&self) -> bool;
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct TaskId(usize);
|
||||
|
||||
pub struct Task<R>(async_task::Task<R>);
|
||||
|
||||
impl<R> Task<R> {
|
||||
pub fn id(&self) -> TaskId {
|
||||
TaskId(0) // Placeholder
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TaskLabel {
|
||||
fn default() -> Self {
|
||||
TaskLabel(0)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SchedulerConfig {
|
||||
pub randomize_order: bool,
|
||||
pub seed: u64,
|
||||
}
|
||||
|
||||
impl Default for SchedulerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
randomize_order: true,
|
||||
seed: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestScheduler {
|
||||
inner: Mutex<TestSchedulerInner>,
|
||||
}
|
||||
|
||||
struct TestSchedulerInner {
|
||||
rng: ChaCha8Rng,
|
||||
foreground_queue: VecDeque<Runnable>,
|
||||
creation_thread_id: ThreadId,
|
||||
}
|
||||
|
||||
impl TestScheduler {
|
||||
pub fn new(config: SchedulerConfig) -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(TestSchedulerInner {
|
||||
rng: ChaCha8Rng::seed_from_u64(config.seed),
|
||||
foreground_queue: VecDeque::new(),
|
||||
creation_thread_id: thread::current().id(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tick(&self, background_only: bool) -> bool {
|
||||
let mut inner = self.inner.lock();
|
||||
if !background_only {
|
||||
if let Some(runnable) = inner.foreground_queue.pop_front() {
|
||||
drop(inner); // Unlock while running
|
||||
runnable.run();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn run(&self) {
|
||||
while self.tick(false) {}
|
||||
}
|
||||
}
|
||||
|
||||
impl Scheduler for TestScheduler {
|
||||
fn schedule(&self, runnable: Runnable, _label: Option<TaskLabel>) {
|
||||
runnable.run();
|
||||
}
|
||||
|
||||
fn schedule_foreground(&self, runnable: Runnable, _label: Option<TaskLabel>) {
|
||||
self.inner.lock().foreground_queue.push_back(runnable);
|
||||
}
|
||||
|
||||
fn is_main_thread(&self) -> bool {
|
||||
thread::current().id() == self.inner.lock().creation_thread_id
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ForegroundExecutor {
|
||||
scheduler: Arc<dyn Scheduler>,
|
||||
_phantom: PhantomData<()>,
|
||||
}
|
||||
|
||||
impl ForegroundExecutor {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
scheduler,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spawn<R: 'static>(&self, future: impl Future<Output = R> + 'static) -> Task<R> {
|
||||
let scheduler = self.scheduler.clone();
|
||||
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
|
||||
scheduler.schedule_foreground(runnable, None);
|
||||
});
|
||||
runnable.schedule();
|
||||
Task(task)
|
||||
}
|
||||
|
||||
pub fn spawn_labeled<R: 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = R> + 'static,
|
||||
label: TaskLabel,
|
||||
) -> Task<R> {
|
||||
let scheduler = self.scheduler.clone();
|
||||
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
|
||||
scheduler.schedule_foreground(runnable, Some(label));
|
||||
});
|
||||
runnable.schedule();
|
||||
Task(task)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BackgroundExecutor {
|
||||
scheduler: Arc<dyn Scheduler>,
|
||||
}
|
||||
|
||||
impl BackgroundExecutor {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
|
||||
Ok(Self { scheduler })
|
||||
}
|
||||
|
||||
pub fn spawn<R: 'static + Send>(
|
||||
&self,
|
||||
future: impl Future<Output = R> + Send + 'static,
|
||||
) -> Task<R> {
|
||||
let scheduler = self.scheduler.clone();
|
||||
let (runnable, task) = async_task::spawn(future, move |runnable| {
|
||||
scheduler.schedule_foreground(runnable, None);
|
||||
});
|
||||
runnable.schedule();
|
||||
Task(task)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn test_basic_spawn_and_run() {
|
||||
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
|
||||
let executor = ForegroundExecutor::new(scheduler.clone()).unwrap();
|
||||
|
||||
let flag = Arc::new(AtomicBool::new(false));
|
||||
assert!(!flag.load(Ordering::SeqCst));
|
||||
let _task = executor.spawn({
|
||||
let flag = flag.clone();
|
||||
async move {
|
||||
flag.store(true, Ordering::SeqCst);
|
||||
}
|
||||
});
|
||||
|
||||
assert!(!flag.load(Ordering::SeqCst));
|
||||
|
||||
scheduler.run();
|
||||
|
||||
assert!(flag.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_background_task_with_foreground_wait() {
|
||||
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
|
||||
|
||||
// Create a oneshot channel to send data from background to foreground
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Spawn background task that sends 42
|
||||
let bg_executor = BackgroundExecutor::new(scheduler.clone()).unwrap();
|
||||
let _background_task = bg_executor.spawn(async move {
|
||||
tx.send(42).unwrap();
|
||||
});
|
||||
|
||||
// Run all tasks
|
||||
scheduler.run();
|
||||
|
||||
// Block on receiving the value from the background task
|
||||
let received = executor::block_on(rx).unwrap();
|
||||
|
||||
// Assert on the result
|
||||
assert_eq!(received, 42);
|
||||
}
|
||||
}
|
||||
520
thoughts/2025-08-31_15-47-17_test-scheduler-design.md
Normal file
520
thoughts/2025-08-31_15-47-17_test-scheduler-design.md
Normal file
@@ -0,0 +1,520 @@
|
||||
# TestScheduler Design Details
|
||||
|
||||
This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. **Updates incorporate Executor and ForegroundExecutor wrappers around Arc<dyn Scheduler>, with ForegroundExecutor using PhantomData<Rc<()>> for !Send and panicking if not on the creation thread, applied to both GPUI and Cloud for consistency and simplicity.**
|
||||
|
||||
## Overview
|
||||
|
||||
The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
|
||||
|
||||
- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
|
||||
- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). **ForegroundExecutor is now used in Cloud for single-threaded simplicity, avoiding Send requirements on futures.**
|
||||
- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
|
||||
|
||||
There is **no separate TestScheduler trait**—all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. **Executors wrap Arc<dyn Scheduler>, with ForegroundExecutor enforcing thread safety via phantom Rc and creation-thread checks.**
|
||||
|
||||
## Design Principles
|
||||
|
||||
- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features.
|
||||
- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
|
||||
- **Determinism**: Always seeded with configurable randomization.
|
||||
- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
|
||||
- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor now uses Executor and ForegroundExecutor).
|
||||
- **Thread Safety Enforcement**: ForegroundExecutor uses phantom Rc for !Send and checks against creation thread for main-thread isolation.
|
||||
|
||||
## Complete TestScheduler Implementation
|
||||
|
||||
```rust
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::future::Future;
|
||||
use std::ops::RangeInclusive;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use async_task::Runnable;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::channel::oneshot;
|
||||
use parking_lot::{Mutex as ParkingMutex, MutexGuard};
|
||||
use rand::prelude::*;
|
||||
use rand_chacha::ChaCha8Rng;
|
||||
|
||||
// Core types (shared with main crate)
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct TaskId(usize);
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct TaskLabel(usize);
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct SessionId(usize);
|
||||
|
||||
pub enum ThreadTarget { Main, Background }
|
||||
|
||||
pub enum VariationSource {
|
||||
/// Seeded randomness for reproducible probabilistic scheduling.
|
||||
Seeded(u64),
|
||||
/// Fuzzed inputs for deterministic exploration of variations.
|
||||
Fuzzed(SchedulerFuzzInput),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SchedulerConfig {
|
||||
/// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
|
||||
pub randomize_order: bool,
|
||||
/// Max steps before panic (for deadlock detection).
|
||||
pub max_steps: usize,
|
||||
/// Whether to log operations (for debugging).
|
||||
pub log_operations: bool,
|
||||
/// The source of non-deterministic decisions (mutually exclusive modes).
|
||||
pub variation_source: VariationSource,
|
||||
}
|
||||
|
||||
impl Default for SchedulerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
randomize_order: true,
|
||||
max_steps: 10000,
|
||||
log_operations: false,
|
||||
variation_source: VariationSource::Seeded(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SchedulerConfig {
|
||||
/// Create a seeded config (randomization enabled by default).
|
||||
pub fn seeded(seed: u64) -> Self {
|
||||
Self {
|
||||
variation_source: VariationSource::Seeded(seed),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a fuzzed config (randomization enabled by default).
|
||||
pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
|
||||
Self {
|
||||
variation_source: VariationSource::Fuzzed(fuzz_inputs),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// For isolation: Disable randomization for deterministic testing.
|
||||
pub fn deterministic(seed: u64) -> Self {
|
||||
Self {
|
||||
randomize_order: false,
|
||||
variation_source: VariationSource::Seeded(seed),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Arbitrary, Debug, Clone)]
|
||||
pub struct SchedulerFuzzInput {
|
||||
queue_selections: Vec<u8>,
|
||||
task_indices: Vec<u32>,
|
||||
delay_bools: Vec<bool>,
|
||||
block_ticks: Vec<usize>,
|
||||
}
|
||||
|
||||
pub struct Task<T> {
|
||||
id: TaskId,
|
||||
rx: oneshot::Receiver<T>,
|
||||
scheduler: Arc<TestScheduler>,
|
||||
}
|
||||
|
||||
impl<T> Future for Task<T> {
|
||||
type Output = T;
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
match self.rx.try_recv() {
|
||||
Ok(Some(val)) => Poll::Ready(val),
|
||||
_ => { cx.waker().wake_by_ref(); Poll::Pending }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Internal state
|
||||
struct TaskInfo {
|
||||
label: Option<TaskLabel>,
|
||||
session_id: Option<SessionId>,
|
||||
waker: Option<Waker>,
|
||||
future: Option<Pin<Box<dyn Future<Output = ()>>>>,
|
||||
state: TaskState,
|
||||
delay_range: Option<(Instant, Instant)>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum TaskState { Pending, Running, Completed }
|
||||
|
||||
struct WorkerSession {
|
||||
spawned_tasks: HashSet<TaskId>,
|
||||
wait_until_tasks: HashSet<TaskId>,
|
||||
}
|
||||
|
||||
struct SchedulerState {
|
||||
rng: ChaCha8Rng,
|
||||
randomize_order: bool,
|
||||
max_steps: usize,
|
||||
current_time: Instant,
|
||||
next_task_id: AtomicUsize,
|
||||
tasks: HashMap<TaskId, TaskInfo>,
|
||||
sessions: HashMap<SessionId, WorkerSession>,
|
||||
current_session: Option<SessionId>,
|
||||
ready_queue: VecDeque<TaskId>,
|
||||
deprioritized_queue: VecDeque<TaskId>,
|
||||
main_thread_queue: VecDeque<TaskId>,
|
||||
delayed: Vec<(Instant, TaskId)>,
|
||||
deprioritized_labels: HashSet<TaskLabel>,
|
||||
block_tick_range: std::ops::RangeInclusive<usize>,
|
||||
parker: parking_lot::Parker,
|
||||
unparker: parking_lot::Unparker,
|
||||
parking_allowed: AtomicBool,
|
||||
execution_history: Vec<String>,
|
||||
fuzz_inputs: Option<FuzzedSchedulerInputs>,
|
||||
creation_thread_id: thread::ThreadId, // Added for thread safety checks
|
||||
}
|
||||
|
||||
// Concrete implementation
|
||||
pub struct TestScheduler {
|
||||
state: Arc<Mutex<SchedulerState>>,
|
||||
}
|
||||
|
||||
impl TestScheduler {
|
||||
/// Primary constructor: Create a scheduler from full configuration.
|
||||
pub fn new(config: SchedulerConfig) -> Arc<Self> {
|
||||
let (parker, unparker) = parking_lot::pair();
|
||||
let (rng, fuzz_inputs) = match config.variation_source {
|
||||
VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None),
|
||||
VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)),
|
||||
};
|
||||
let state = SchedulerState {
|
||||
rng,
|
||||
randomize_order: config.randomize_order,
|
||||
max_steps: config.max_steps,
|
||||
current_time: Instant::now(),
|
||||
next_task_id: AtomicUsize::new(1),
|
||||
tasks: HashMap::new(),
|
||||
sessions: HashMap::new(),
|
||||
current_session: None,
|
||||
ready_queue: VecDeque::new(),
|
||||
deprioritized_queue: VecDeque::new(),
|
||||
main_thread_queue: VecDeque::new(),
|
||||
delayed: Vec::new(),
|
||||
deprioritized_labels: HashSet::new(),
|
||||
block_tick_range: 0..=1000,
|
||||
parker,
|
||||
unparker,
|
||||
parking_allowed: AtomicBool::new(false),
|
||||
execution_history: Vec::new(),
|
||||
fuzz_inputs,
|
||||
creation_thread_id: thread::current().id(), // Capture creation thread
|
||||
};
|
||||
Arc::new(Self { state: Arc::new(Mutex::new(state)) })
|
||||
}
|
||||
|
||||
// Added for ForegroundExecutor thread checks
|
||||
pub fn assert_main_thread(&self) {
|
||||
let state = self.state.lock().unwrap();
|
||||
if thread::current().id() != state.creation_thread_id {
|
||||
panic!("ForegroundExecutor method called from wrong thread");
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience helper: Create a seeded scheduler (randomization enabled by default).
|
||||
pub fn from_seed(seed: u64) -> Arc<Self> {
|
||||
Self::new(SchedulerConfig::seeded(seed))
|
||||
}
|
||||
|
||||
/// Convenience helper: Create a scheduler driven by fuzzed inputs.
|
||||
/// Use for fuzzing with Bolero.
|
||||
pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
|
||||
Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
|
||||
}
|
||||
|
||||
// Test-specific methods (direct on impl, no trait)
|
||||
pub fn assign_task_id(&self) -> TaskId {
|
||||
TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
|
||||
}
|
||||
|
||||
pub fn deprioritize(&self, label: TaskLabel) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.deprioritized_labels.insert(label);
|
||||
}
|
||||
|
||||
pub fn is_task_running(&self, task_id: TaskId) -> bool {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
|
||||
}
|
||||
|
||||
pub fn tick(&self, background_only: bool) -> bool {
|
||||
self.tick_internal(background_only)
|
||||
}
|
||||
|
||||
fn tick_internal(&self, background_only: bool) -> bool {
|
||||
// Process delays first (drop lock before polling)
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.delayed.retain(|&(time, task_id)| {
|
||||
if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
|
||||
state.ready_queue.push_back(task_id);
|
||||
false
|
||||
} else { true }
|
||||
});
|
||||
} // Lock dropped here
|
||||
|
||||
// Select and poll task without lock held
|
||||
let task_to_poll = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
|
||||
if !background_only { queues.insert(0, &mut state.main_thread_queue); }
|
||||
|
||||
let mut available: Vec<usize> = queues.iter().enumerate()
|
||||
.filter(|&(_, q)| !q.is_empty())
|
||||
.map(|(i, _)| i)
|
||||
.collect();
|
||||
|
||||
if available.is_empty() { return false; }
|
||||
|
||||
if state.randomize_order { available.shuffle(&mut state.rng); }
|
||||
|
||||
let queue_ix = available[0];
|
||||
let task_id = queues[queue_ix].pop_front().unwrap();
|
||||
Some(task_id)
|
||||
}; // Lock dropped here
|
||||
|
||||
// Poll the task's future outside the lock
|
||||
let poll_result = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
|
||||
task.state = TaskState::Running;
|
||||
if let Some(fut) = task.future.as_mut() {
|
||||
if let Some(waker) = task.waker.as_ref() {
|
||||
let mut context = Context::from_waker(waker);
|
||||
fut.as_mut().poll(&mut context)
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}; // Lock dropped here
|
||||
|
||||
// Update task state after polling
|
||||
if poll_result.is_ready() {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
|
||||
task.state = TaskState::Completed;
|
||||
state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub fn advance_clock(&self, duration: Duration) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.current_time += duration;
|
||||
}
|
||||
|
||||
pub fn run_until_parked(&self) {
|
||||
while self.tick(false) {}
|
||||
}
|
||||
|
||||
// Cloud session methods
|
||||
pub fn create_session(&self) -> SessionId {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let id = SessionId(state.sessions.len());
|
||||
state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
|
||||
id
|
||||
}
|
||||
|
||||
pub fn set_current_session(&self, session_id: Option<SessionId>) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.current_session = session_id;
|
||||
}
|
||||
|
||||
pub fn get_current_session(&self) -> Option<SessionId> {
|
||||
self.state.lock().unwrap().current_session
|
||||
}
|
||||
|
||||
pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let Some(session) = state.sessions.get_mut(&session_id) {
|
||||
session.spawned_tasks.insert(task_id);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let Some(session) = state.sessions.get_mut(&session_id) {
|
||||
session.wait_until_tasks.insert(task_id);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
|
||||
let state = self.state.lock().unwrap();
|
||||
if let Some(session) = state.sessions.get(&session_id) {
|
||||
let dangling: Vec<_> = session.spawned_tasks.iter()
|
||||
.filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
|
||||
.filter(|&&tid| !session.wait_until_tasks.contains(&tid))
|
||||
.cloned()
|
||||
.collect();
|
||||
if !dangling.is_empty() {
|
||||
return Err(anyhow!("{} dangling tasks", dangling.len()));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Other test methods (e.g., GPUI block simulation)
|
||||
pub fn gen_block_on_ticks(&self) -> usize {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.rng.gen_range(state.block_on_ticks_range.clone())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## GPUI Usage Example
|
||||
|
||||
GPUI wraps the TestScheduler using Executor and ForegroundExecutor:
|
||||
|
||||
```rust
|
||||
use std::marker::PhantomData;
|
||||
use std::rc::Rc;
|
||||
|
||||
// Generic Executor for background tasks (Send futures)
|
||||
pub struct Executor {
|
||||
scheduler: Arc<dyn Scheduler>,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
|
||||
Self { scheduler }
|
||||
}
|
||||
|
||||
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
|
||||
where R: Send + 'static {
|
||||
// Delegate to scheduler via downcast for test methods if applicable
|
||||
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
// Use test_sched methods
|
||||
}
|
||||
self.scheduler.spawn(future)
|
||||
}
|
||||
|
||||
pub fn spawn_labeled<R>(
|
||||
&self,
|
||||
label: TaskLabel,
|
||||
future: impl Future<Output = R> + Send + 'static
|
||||
) -> Task<R>
|
||||
where R: Send + 'static {
|
||||
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
test_sched.deprioritize(label);
|
||||
}
|
||||
self.scheduler.spawn_labeled(label, future)
|
||||
}
|
||||
|
||||
pub fn deprioritize(&self, label: TaskLabel) {
|
||||
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
test_sched.deprioritize(label);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timer(&self, duration: Duration) -> Task<()> {
|
||||
self.scheduler.timer(duration)
|
||||
}
|
||||
|
||||
pub fn tick(&self) -> Option<bool> {
|
||||
self.scheduler.as_any().downcast_ref::<TestScheduler>().map(|ts| ts.tick(false))
|
||||
}
|
||||
}
|
||||
|
||||
// ForegroundExecutor for main-thread tasks (!Send futures, thread checks)
|
||||
pub struct ForegroundExecutor {
|
||||
executor: Executor,
|
||||
_phantom: PhantomData<Rc<()>>, // Enforces !Send
|
||||
}
|
||||
|
||||
impl !Send for ForegroundExecutor {} // Explicitly !Send
|
||||
|
||||
impl ForegroundExecutor {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
|
||||
let executor = Executor::new(scheduler);
|
||||
// Check thread immediately via scheduler
|
||||
if let Some(test_sched) = executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
test_sched.assert_main_thread();
|
||||
} else {
|
||||
// Production: assume created on main thread
|
||||
}
|
||||
Ok(Self { executor, _phantom: PhantomData })
|
||||
}
|
||||
|
||||
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
|
||||
where R: 'static {
|
||||
// Assert thread before delegating
|
||||
if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
test_sched.assert_main_thread();
|
||||
}
|
||||
self.executor.scheduler.spawn_foreground(future)
|
||||
}
|
||||
|
||||
pub fn timer(&self, duration: Duration) -> Task<()> {
|
||||
if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
test_sched.assert_main_thread();
|
||||
}
|
||||
self.executor.scheduler.timer(duration)
|
||||
}
|
||||
|
||||
// Other methods mirror Executor but with thread checks
|
||||
}
|
||||
```
|
||||
|
||||
## Cloud Usage Example
|
||||
|
||||
Cloud wraps using ForegroundExecutor for single-threaded simplicity (no Send futures required):
|
||||
|
||||
```rust
|
||||
impl SimulatedExecutionContext {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
|
||||
let fg_executor = ForegroundExecutor::new(scheduler)?; // Use ForegroundExecutor for thread safety and simplicity
|
||||
Self {
|
||||
executor: fg_executor,
|
||||
session_counter: AtomicUsize::new(0),
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
current_session: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
|
||||
let task = self.executor.spawn(async move { future.await })?;
|
||||
|
||||
// Direct use of TestScheduler methods via downcast from executor
|
||||
if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
if let Some(session_id) = test_sched.get_current_session() {
|
||||
test_sched.track_task_for_session(task.id(), session_id);
|
||||
test_sched.add_wait_until_task(session_id, task.id());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Key Dependencies and Assumptions
|
||||
|
||||
- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`.
|
||||
- **async_task**: For Runnable wrapping.
|
||||
- **rand_chacha**: For seeded RNG.
|
||||
- **futures**: For channels.
|
||||
- **chrono**: For time ranges (optional).
|
||||
- **anyhow**: For errors.
|
||||
- **std::thread**: For thread ID comparison.
|
||||
|
||||
The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
|
||||
|
||||
This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct, now wrapped by Executors for better encapsulation and thread safety.
|
||||
460
thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md
Normal file
460
thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md
Normal file
@@ -0,0 +1,460 @@
|
||||
┌─────────────────────────────────────────┐
|
||||
│ Shared Crate │
|
||||
├─────────────────────────────────────────┤
|
||||
│ Scheduler trait: │
|
||||
│ - Core object-safe interface │
|
||||
│ │
|
||||
│ TestScheduler: │
|
||||
│ - Should implement Scheduler + test features │
|
||||
│ - deprioritize() - test-only method │
|
||||
│ - spawn_labeled() - labels for testing │
|
||||
│ - Task lifecycle tracking │
|
||||
│ - creation_thread_id for Foreground checks│
|
||||
│ │
|
||||
│ Executor wrappers: │
|
||||
│ - Executor: Wraps Arc<dyn Scheduler>, Send futures│
|
||||
│ - ForegroundExecutor: Wraps Arc<dyn Scheduler>, !Send, thread checks│
|
||||
└─────────────────────────────────────────┘
|
||||
▲
|
||||
┌─────────┼─────────┐
|
||||
│ │ │
|
||||
┌─────────┴────┐ ┌─┴─────────┴────┐
|
||||
│ GPUI │ │ Cloud │
|
||||
│ Uses Executor│ │ ForegroundExec │
|
||||
│ + Foreground │ │ for single-thrd│
|
||||
│ + TestScheduler│ │ + TestScheduler│
|
||||
└──────────────┘ └─────────────────┘
|
||||
```
|
||||
|
||||
## GPUI Integration
|
||||
|
||||
### Platform Scheduler Implementations in GPUI
|
||||
|
||||
Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
|
||||
|
||||
- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
|
||||
- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
|
||||
- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
|
||||
|
||||
**Rationale**: Platform implementations are substantial and deeply integrated with:
|
||||
- Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
|
||||
- GPUI's event loop integration (main thread messaging)
|
||||
- Platform-specific performance optimizations
|
||||
|
||||
The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.**
|
||||
|
||||
### BackgroundExecutor Integration
|
||||
|
||||
GPUI's executors now use wrappers:
|
||||
|
||||
```rust
|
||||
// crates/gpui/src/executor.rs
|
||||
pub struct BackgroundExecutor {
|
||||
executor: Executor, // Generic wrapper for background tasks (Send futures)
|
||||
}
|
||||
|
||||
impl BackgroundExecutor {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
|
||||
Self { executor: Executor::new(scheduler) }
|
||||
}
|
||||
|
||||
// Core spawning methods via wrapper delegation
|
||||
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
|
||||
where R: Send + 'static {
|
||||
self.executor.spawn(future)
|
||||
}
|
||||
|
||||
pub fn spawn_labeled<R>(
|
||||
&self,
|
||||
label: TaskLabel,
|
||||
future: impl Future<Output = R> + Send + 'static
|
||||
) -> Task<R>
|
||||
where R: Send + 'static {
|
||||
self.executor.spawn_labeled(label, future)
|
||||
}
|
||||
|
||||
// Timer functionality via wrapper
|
||||
pub fn timer(&self, duration: Duration) -> Task<()> {
|
||||
self.executor.timer(duration)
|
||||
}
|
||||
|
||||
// Test-specific methods via downcast in wrapper
|
||||
pub fn deprioritize(&self, label: TaskLabel) {
|
||||
self.executor.deprioritize(label);
|
||||
}
|
||||
|
||||
pub fn tick(&self) -> Option<bool> {
|
||||
self.executor.tick()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### ForegroundExecutor Integration
|
||||
|
||||
GPUI's foreground executor enforces main-thread usage:
|
||||
|
||||
```rust
|
||||
// crates/gpui/src/executor.rs
|
||||
pub struct ForegroundExecutor {
|
||||
executor: Executor, // Underlying executor for delegation
|
||||
_phantom: PhantomData<Rc<()>>, // Enforces !Send
|
||||
creation_thread_id: ThreadId, // Stored for checks
|
||||
}
|
||||
|
||||
impl !Send for ForegroundExecutor {} // Explicitly !Send
|
||||
|
||||
impl ForegroundExecutor {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
|
||||
let creation_thread_id = thread::current().id();
|
||||
// Delegate creation to underlying scheduler
|
||||
let _ = Executor::new(scheduler.clone());
|
||||
Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id })
|
||||
}
|
||||
|
||||
// Core spawning for main thread (non-Send futures)
|
||||
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
|
||||
where R: 'static {
|
||||
if thread::current().id() != self.creation_thread_id {
|
||||
panic!("ForegroundExecutor called off main thread");
|
||||
}
|
||||
// Delegate to scheduler.spawn_foreground via wrapper
|
||||
self.executor.scheduler.spawn_foreground(future)
|
||||
}
|
||||
|
||||
// Timer and test methods same as BackgroundExecutor but with thread checks
|
||||
pub fn timer(&self, duration: Duration) -> Task<()> {
|
||||
if thread::current().id() != self.creation_thread_id {
|
||||
panic!("ForegroundExecutor called off main thread");
|
||||
}
|
||||
self.executor.timer(duration)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Cloud Integration
|
||||
|
||||
### Session Coordination in SimulatedExecutionContext
|
||||
|
||||
Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. **Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.**
|
||||
|
||||
```rust
|
||||
// crates/platform_simulator/src/platform.rs
|
||||
pub struct SimulatedExecutionContext {
|
||||
fg_executor: ForegroundExecutor, // Single-threaded wrapper for simplicity
|
||||
session_counter: AtomicUsize,
|
||||
sessions: Mutex<HashMap<SessionId, WorkerSession>>,
|
||||
current_session: Mutex<Option<SessionId>>,
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl PlatformRuntime for SimulatedExecutionContext {
|
||||
async fn delay(&self, duration: Duration) {
|
||||
// Use wrapper's timer for delay
|
||||
self.fg_executor.timer(duration).await;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Wait Until Implementation
|
||||
|
||||
Session coordination integrated directly with wrapper's task scheduling:
|
||||
|
||||
```rust
|
||||
#[async_trait(?Send)]
|
||||
impl ExecutionContext for SimulatedExecutionContext {
|
||||
fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
|
||||
// 1. Spawn using wrapper (no Send required)
|
||||
let task = self.fg_executor.spawn(async move { future.await })?;
|
||||
|
||||
// 2. Register with session coordination via downcast
|
||||
if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
if let Some(session_id) = test_sched.get_current_session() {
|
||||
test_sched.track_task_for_session(task.id(), session_id);
|
||||
test_sched.add_wait_until_task(session_id, task.id());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn pass_through(&self) -> Result<()> {
|
||||
// Use wrapper's timer for delay
|
||||
self.fg_executor.timer(Duration::from_millis(10)).await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Session Coordination Methods
|
||||
|
||||
Core session operations handled within SimulatedExecutionContext:
|
||||
|
||||
```rust
|
||||
impl SimulatedExecutionContext {
|
||||
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
|
||||
let fg_executor = ForegroundExecutor::new(scheduler)?;
|
||||
Ok(Self {
|
||||
fg_executor,
|
||||
session_counter: AtomicUsize::new(0),
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
current_session: Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_session(&self) -> SessionId {
|
||||
let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let session_id = SessionId(session_counter);
|
||||
|
||||
self.sessions.lock().insert(session_id, WorkerSession {
|
||||
spawned_tasks: HashSet::new(),
|
||||
wait_until_task_ids: HashSet::new(),
|
||||
});
|
||||
|
||||
session_id
|
||||
}
|
||||
|
||||
pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
{
|
||||
{
|
||||
let mut current = self.current_session.lock();
|
||||
*current = Some(session_id);
|
||||
}
|
||||
|
||||
let result = f();
|
||||
|
||||
{
|
||||
let mut current = self.current_session.lock();
|
||||
*current = None;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
|
||||
let sessions = self.sessions.lock();
|
||||
if let Some(session) = sessions.get(&session_id) {
|
||||
// Check running tasks using wrapper's TestScheduler access
|
||||
if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
|
||||
let dangling_tasks: Vec<TaskId> = session
|
||||
.spawned_tasks
|
||||
.iter()
|
||||
.filter(|&&task_id| test_sched.is_task_running(task_id))
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
// Cloud-specific permission check
|
||||
let unauthorized: Vec<_> = dangling_tasks
|
||||
.into_iter()
|
||||
.filter(|task_id| !session.wait_until_task_ids.contains(task_id))
|
||||
.collect();
|
||||
|
||||
if !unauthorized.is_empty() {
|
||||
return Err(platform_api::WorkerError::Other(anyhow!(
|
||||
"Session cleanup failed: {} unauthorized tasks still running",
|
||||
unauthorized.len()
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_current_session(&self, session_id: SessionId) {
|
||||
*self.current_session.lock() = Some(session_id);
|
||||
}
|
||||
|
||||
pub fn get_current_session(&self) -> Option<SessionId> {
|
||||
*self.current_session.lock()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Cloud-Specific Data Structures
|
||||
|
||||
Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers.
|
||||
|
||||
## Scheduler Trait Definition
|
||||
|
||||
```rust
|
||||
pub trait Scheduler: Send + Sync {
|
||||
/// Schedule a runnable to be executed (object-safe core functionality)
|
||||
fn schedule(&self, runnable: Runnable);
|
||||
|
||||
/// Schedule a runnable with label for test tracking
|
||||
fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
|
||||
|
||||
/// Schedule a runnable on the main thread (optional, defaults to panic)
|
||||
fn schedule_foreground(&self, runnable: Runnable) {
|
||||
panic!("schedule_foreground not supported by this scheduler");
|
||||
}
|
||||
|
||||
/// Schedule a runnable after a delay (object-safe for timers)
|
||||
fn schedule_after(&self, duration: Duration, runnable: Runnable);
|
||||
|
||||
/// Platform integration methods
|
||||
fn park(&self, timeout: Option<Duration>) -> bool;
|
||||
fn unparker(&self) -> Unparker;
|
||||
fn is_main_thread(&self) -> bool;
|
||||
fn now(&self) -> Instant;
|
||||
}
|
||||
|
||||
|
||||
|
||||
## TestScheduler (Concrete Implementation)
|
||||
|
||||
```rust
|
||||
pub struct TestScheduler {
|
||||
inner: RefCell<TestSchedulerInner>,
|
||||
}
|
||||
|
||||
struct TestSchedulerInner {
|
||||
tasks: HashMap<TaskId, TaskState>,
|
||||
task_labels: HashMap<TaskId, TaskLabel>,
|
||||
deprioritized_labels: HashSet<TaskLabel>,
|
||||
deprioritized_queue: VecDeque<(Runnable, TaskId)>,
|
||||
main_thread_queue: VecDeque<Runnable>,
|
||||
delayed: Vec<(Instant, Runnable)>,
|
||||
parker: Parker,
|
||||
is_main_thread: bool,
|
||||
now: Instant,
|
||||
next_task_id: AtomicUsize,
|
||||
rng: StdRng,
|
||||
waiting_tasks: HashSet<TaskId>,
|
||||
parking_allowed: bool,
|
||||
waiting_hint: Option<String>,
|
||||
block_tick_range: std::ops::RangeInclusive<usize>,
|
||||
creation_thread_id: ThreadId, // Added for wrapper checks
|
||||
}
|
||||
|
||||
impl Scheduler for TestScheduler {
|
||||
fn schedule(&self, runnable: Runnable) {
|
||||
// Implementation as before
|
||||
}
|
||||
|
||||
fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
|
||||
// Implementation as before, with deprioritization
|
||||
}
|
||||
|
||||
fn schedule_foreground(&self, runnable: Runnable) {
|
||||
assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread");
|
||||
// Implementation as before
|
||||
}
|
||||
|
||||
// Other trait methods unchanged
|
||||
}
|
||||
|
||||
impl TestScheduler {
|
||||
// Test-specific methods (NOT on main trait)
|
||||
pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ }
|
||||
pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ }
|
||||
pub fn tick(&self) -> bool { /* implementation */ }
|
||||
pub fn run_until_parked(&self) { /* implementation */ }
|
||||
pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
|
||||
pub fn assert_main_thread(&self) { /* implementation */ }
|
||||
}
|
||||
```
|
||||
|
||||
## Generic Spawn Helpers
|
||||
|
||||
Generic spawn methods implemented for `dyn Scheduler`, now called by wrappers.
|
||||
|
||||
## Migration Strategy
|
||||
|
||||
### Phase 1: Core Infrastructure
|
||||
1. Define `Scheduler` trait (core methods only)
|
||||
2. Implement `TestScheduler` with thread ID tracking
|
||||
3. Add wrapper structs `Executor` and `ForegroundExecutor`
|
||||
4. Make existing GPUI platform dispatchers implement `Scheduler` trait
|
||||
5. Add `as_any()` to `Scheduler` for downcasting
|
||||
|
||||
### Phase 2: GPUI Migration
|
||||
1. Update GPUI executors to use `Executor` and `ForegroundExecutor` wrappers
|
||||
2. Handle downcasting in wrappers for test features
|
||||
3. Preserve all existing GPUI functionality
|
||||
4. Test deployments use TestScheduler, production uses minimal schedulers
|
||||
|
||||
### Phase 3: Cloud Integration
|
||||
1. Update `SimulatedExecutionContext` to use `ForegroundExecutor`
|
||||
2. Move session coordination logic into `SimulatedExecutionContext`
|
||||
3. Integrate `wait_until()` with wrapper scheduling
|
||||
4. Use TestScheduler features for session validation via downcast
|
||||
5. Preserve all existing Cloud platform APIs
|
||||
|
||||
### Phase 4: Testing & Validation
|
||||
1. GPUI tests work with new architecture
|
||||
2. Cloud session behavior preserved (single-threaded)
|
||||
3. Production efficiency maintained
|
||||
4. Both domains benefit from unified test infrastructure
|
||||
|
||||
## Platform Backend Files
|
||||
|
||||
### GPUI Backends
|
||||
- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` implements `Scheduler`
|
||||
- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` implements `Scheduler`
|
||||
- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` implements `Scheduler`
|
||||
- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` → `TestScheduler` (moved to shared crate)
|
||||
|
||||
### Cloud Backends
|
||||
- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` uses `ForegroundExecutor`
|
||||
- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using `ForegroundExecutor`
|
||||
|
||||
## Compatibility Checklist
|
||||
|
||||
## Complete GPUI + Cloud Feature Coverage ✅
|
||||
|
||||
### GPUI Compatibility
|
||||
- ✅ `spawn()` → `Executor::spawn()` or `ForegroundExecutor::spawn()`
|
||||
- ✅ `spawn_labeled(label)` → Wrappers delegate to `dyn Scheduler::spawn_labeled()`
|
||||
- ✅ `timer(duration)` → Wrappers delegate to `dyn Scheduler::timer()`
|
||||
- ✅ `block(future)` → Wrappers handle with parking
|
||||
- ✅ `block_with_timeout(future, timeout)` → Wrappers handle
|
||||
- ✅ `now()` → `scheduler.now()` (direct trait method)
|
||||
- ✅ `is_main_thread()` → `scheduler.is_main_thread()` (direct trait method)
|
||||
- ✅ `num_cpus()` → Generic helper on wrappers
|
||||
- ✅ `deprioritize(label)` → Downcast in wrappers, then TestScheduler::deprioritize()
|
||||
- ✅ `tick()` → Downcast in wrappers, then TestScheduler::tick()
|
||||
- ✅ `run_until_parked()` → Downcast in wrappers, then TestScheduler::run_until_parked()
|
||||
- ✅ `advance_clock(duration)` → Downcast in wrappers, then TestScheduler::advance_clock()
|
||||
- ✅ `simulate_random_delay()` → Downcast in wrappers, then TestScheduler::simulate_random_delay()
|
||||
- ✅ `BackgroundExecutor` → Uses `Executor` wrapper
|
||||
|
||||
### Cloud Compatibility
|
||||
- ✅ **Session Coordination**: `ExecutionContext.wait_until()` via `ForegroundExecutor`
|
||||
- ✅ **Task Lifecycle**: Uses wrapper's TestScheduler access for validation
|
||||
- ✅ **Worker Management**: Session context and cleanup validation
|
||||
- ✅ **Background Tasks**: Explicit permission system for long-running work
|
||||
- ✅ **Deterministic Testing**: Full TestScheduler integration with session tracking
|
||||
- ✅ **Platform Runtime**: `PlatformRuntime.delay()` via wrapper timer
|
||||
- ✅ **Session Validation**: Dangling task detection with proper error reporting
|
||||
- ✅ **Auto-Association**: Tasks automatically linked to sessions during spawn
|
||||
|
||||
### Unified Benefits
|
||||
- ✅ **Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination
|
||||
- ✅ **Unified Task Tracking**: Both domains use TestScheduler via wrappers for validation
|
||||
- ✅ **Composability**: Session coordination built on unified scheduling primitives
|
||||
- ✅ **Domain-Specific**: Each domain handles its coordination concerns appropriately
|
||||
- ✅ **Test Infrastructure**: Shared deterministic testing capabilities
|
||||
- ✅ **Production Ready**: Both domains can use minimal platform schedulers
|
||||
- ✅ **Extensible**: New coordination patterns can be added without shared crate changes
|
||||
- ✅ **Thread Safety**: ForegroundExecutor enforces main-thread use across domains
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
### Key Design Decisions
|
||||
|
||||
1. **GPUI**: Uses `Executor` for background (Send), `ForegroundExecutor` for main-thread (!Send)
|
||||
2. **Cloud**: Uses `ForegroundExecutor` for single-threaded simplicity (no Send required on futures)
|
||||
3. **Shared**: Core scheduling primitives + wrappers for delegation and safety
|
||||
4. **Integration**: Both domains use wrappers with consistent API
|
||||
|
||||
### Migration Considerations
|
||||
|
||||
- **Zero Breaking Changes**: Existing APIs preserved via wrappers
|
||||
- **Gradual Migration**: Can migrate GPUI and Cloud independently
|
||||
- **Test Preservation**: All existing test functionality maintained
|
||||
- **Performance**: Minimal overhead from trait objects in production
|
||||
- **Cloud Simplification**: ForegroundExecutor allows non-Send futures in single-threaded context
|
||||
|
||||
This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers.
|
||||
Reference in New Issue
Block a user