Compare commits

...

35 Commits

Author SHA1 Message Date
Antonio Scandurra
19d74e4407 Expire timers one at a time on advance_clock 2025-09-09 12:49:20 +02:00
Antonio Scandurra
02a8a048b1 Make capturing pending traces opt-in
This speeds up tests because we don't need to resolve the backtrace.
2025-09-09 12:00:07 +02:00
Antonio Scandurra
b937a5b898 Merge remote-tracking branch 'origin/main' into use-new-scheduler
# Conflicts:
#	crates/channel/src/channel_store_tests.rs
#	crates/gpui/src/platform.rs
2025-09-09 10:34:44 +02:00
Antonio Scandurra
f943b0c379 Checkpoint 2025-09-09 10:33:45 +02:00
Antonio Scandurra
d4876bc335 Run before advancing clock 2025-09-08 20:08:06 +02:00
Antonio Scandurra
a467dd069e Remove outstanding todos 2025-09-08 19:59:29 +02:00
Antonio Scandurra
effabb531b Remove spawn_labeled/deprioritize
We are now simulating more interesting cases in the test scheduler by having
longer delays, a smaller percentage of the time. For the tests that used the
`deprioritize` method, I managed to reproduce the test failures by undoing the
bugfix.
2025-09-08 17:56:40 +02:00
Antonio Scandurra
cb4607cc85 Checkpoint 2025-09-08 16:55:38 +02:00
Antonio Scandurra
58ea61af68 Remove more todos 2025-09-08 15:41:20 +02:00
Antonio Scandurra
d86f5a6015 Checkpoint 2025-09-08 14:11:13 +02:00
Antonio Scandurra
57e5a38100 Move block to ForegroundExecutor 2025-09-08 09:40:43 +02:00
Antonio Scandurra
6789228bb4 Move block_with_timeout to ForegroundExecutor 2025-09-06 18:06:44 +02:00
Antonio Scandurra
a4a2002556 Remove tick 2025-09-06 14:00:08 +02:00
Antonio Scandurra
05f3dcb292 Remove block_test 2025-09-06 13:58:37 +02:00
Antonio Scandurra
d37cd86ad2 Exclude wakers from trace 2025-09-06 13:53:37 +02:00
Antonio Scandurra
d90b621648 WIP 2025-09-05 18:09:16 +02:00
Antonio Scandurra
03a2916473 WIP 2025-09-05 17:58:38 +02:00
Antonio Scandurra
cfeaad1038 Remove helpers for tracking where a test was hanging
This will now happen automatically with the new scheduler.

Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-05 17:16:07 +02:00
Antonio Scandurra
396b9803b5 Checkpoint
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-05 17:13:26 +02:00
Antonio Scandurra
ebe835dd91 Checkpoint 2025-09-05 10:37:36 +02:00
Antonio Scandurra
06f150e7ca Port dispatcher implementation to macOS 2025-09-05 10:20:33 +02:00
Antonio Scandurra
3b351fc21b Avoid using timer(Duration::MAX), as that might fire in tests 2025-09-05 09:46:27 +02:00
Antonio Scandurra
846aec7a87 Implement dispatch_after in terms of the new scheduler 2025-09-05 09:45:52 +02:00
Nathan Sobo
45d299a717 Use timer with Duration::MAX to replace spawning::pending so timer types are consistent 2025-09-04 22:52:07 -06:00
Nathan Sobo
eaeceed5b3 WIP 2025-09-04 22:44:19 -06:00
Nathan Sobo
90b0c3fa7b WIP 2025-09-04 22:28:48 -06:00
Nathan Sobo
34d0a1b6b6 Update extension_host to use new scheduler timer API
Replace background_spawn(futures::future::pending()) with background_executor().timer(Duration::default()) to align with the new scheduler API
2025-09-04 22:23:41 -06:00
Antonio Scandurra
ad976c9434 WIP
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 19:50:55 +02:00
Antonio Scandurra
8c2a46ce2b Remove is_main_thread
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 19:23:28 +02:00
Antonio Scandurra
82b7e63cf8 Checkpoint
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 19:08:13 +02:00
Antonio Scandurra
39ba40722e Checkpoint
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 18:56:50 +02:00
Antonio Scandurra
b49bdbbe8c Checkpoint
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 18:52:49 +02:00
Antonio Scandurra
b275534f28 Move block_on to ForegroundExecutor and exclude tasks from that
session

Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 18:18:34 +02:00
Antonio Scandurra
4ddfbb70e6 WIP 2025-09-04 17:26:12 +02:00
Antonio Scandurra
4e5a8f910e WIP
Co-authored-by: Nathan Sobo <nathan@zed.dev>
2025-09-04 17:26:12 +02:00
61 changed files with 1147 additions and 1508 deletions

5
Cargo.lock generated
View File

@@ -7382,6 +7382,7 @@ dependencies = [
"calloop",
"calloop-wayland-source",
"cbindgen",
"chrono",
"cocoa 0.26.0",
"collections",
"core-foundation 0.10.0",
@@ -7428,6 +7429,7 @@ dependencies = [
"reqwest_client",
"resvg",
"scap",
"scheduler",
"schemars",
"seahash",
"semantic_version",
@@ -13550,6 +13552,7 @@ dependencies = [
"alacritty_terminal",
"anyhow",
"async-dispatcher",
"async-task",
"async-tungstenite",
"base64 0.22.1",
"client",
@@ -14348,9 +14351,9 @@ name = "scheduler"
version = "0.1.0"
dependencies = [
"async-task",
"backtrace",
"chrono",
"futures 0.3.31",
"parking",
"parking_lot",
"rand 0.9.1",
"workspace-hack",

View File

@@ -460,6 +460,7 @@ aws-sdk-bedrockruntime = { version = "1.80.0", features = [
] }
aws-smithy-runtime-api = { version = "1.7.4", features = ["http-1x", "client"] }
aws-smithy-types = { version = "1.3.0", features = ["http-body-1-x"] }
backtrace = "0.3"
base64 = "0.22"
bincode = "1.2.1"
bitflags = "2.6.0"
@@ -567,7 +568,6 @@ objc2-foundation = { version = "0.3", default-features = false, features = [
open = "5.0.0"
ordered-float = "2.1.1"
palette = { version = "0.7.5", default-features = false, features = ["std"] }
parking = "2.0"
parking_lot = "0.12.1"
partial-json-fixer = "0.5.3"
parse_int = "0.9"

View File

@@ -1621,7 +1621,7 @@ impl Thread {
is_input_complete: true,
};
let tool_output = cx.background_executor().block(tool_result.output);
let tool_output = cx.foreground_executor().block_on(tool_result.output);
// Attach a project_notification tool call to the latest existing
// Assistant message. We cannot create a new Assistant message

View File

@@ -2367,6 +2367,7 @@ async fn setup(cx: &mut TestAppContext, model: TestModel) -> ThreadTest {
watch_settings(fs.clone(), cx);
});
cx.run_until_parked();
let templates = Templates::new();

View File

@@ -303,7 +303,7 @@ pub(crate) fn search_symbols(
};
const MAX_MATCHES: usize = 100;
let mut visible_matches = cx.background_executor().block(fuzzy::match_strings(
let mut visible_matches = cx.foreground_executor().block_on(fuzzy::match_strings(
&visible_match_candidates,
&query,
false,
@@ -312,7 +312,7 @@ pub(crate) fn search_symbols(
&cancellation_flag,
cx.background_executor().clone(),
));
let mut external_matches = cx.background_executor().block(fuzzy::match_strings(
let mut external_matches = cx.foreground_executor().block_on(fuzzy::match_strings(
&external_match_candidates,
&query,
false,

View File

@@ -2,7 +2,10 @@ use std::{cmp::Reverse, sync::Arc};
use collections::{HashSet, IndexMap};
use fuzzy::{StringMatch, StringMatchCandidate, match_strings};
use gpui::{Action, AnyElement, App, BackgroundExecutor, DismissEvent, Subscription, Task};
use gpui::{
Action, AnyElement, App, BackgroundExecutor, DismissEvent, ForegroundExecutor, Subscription,
Task,
};
use language_model::{
AuthenticateError, ConfiguredModel, LanguageModel, LanguageModelProviderId,
LanguageModelRegistry,
@@ -275,22 +278,28 @@ enum LanguageModelPickerEntry {
struct ModelMatcher {
models: Vec<ModelInfo>,
fg_executor: ForegroundExecutor,
bg_executor: BackgroundExecutor,
candidates: Vec<StringMatchCandidate>,
}
impl ModelMatcher {
fn new(models: Vec<ModelInfo>, bg_executor: BackgroundExecutor) -> ModelMatcher {
fn new(
models: Vec<ModelInfo>,
fg_executor: ForegroundExecutor,
bg_executor: BackgroundExecutor,
) -> ModelMatcher {
let candidates = Self::make_match_candidates(&models);
Self {
models,
fg_executor,
bg_executor,
candidates,
}
}
pub fn fuzzy_search(&self, query: &str) -> Vec<ModelInfo> {
let mut matches = self.bg_executor.block(match_strings(
let mut matches = self.fg_executor.block_on(match_strings(
&self.candidates,
query,
false,
@@ -386,6 +395,7 @@ impl PickerDelegate for LanguageModelPickerDelegate {
) -> Task<()> {
let all_models = self.all_models.clone();
let active_model = (self.get_active_model)(cx);
let fg_executor = cx.foreground_executor();
let bg_executor = cx.background_executor();
let language_model_registry = LanguageModelRegistry::global(cx);
@@ -416,8 +426,10 @@ impl PickerDelegate for LanguageModelPickerDelegate {
.cloned()
.collect::<Vec<_>>();
let matcher_rec = ModelMatcher::new(recommended_models, bg_executor.clone());
let matcher_all = ModelMatcher::new(available_models, bg_executor.clone());
let matcher_rec =
ModelMatcher::new(recommended_models, fg_executor.clone(), bg_executor.clone());
let matcher_all =
ModelMatcher::new(available_models, fg_executor.clone(), bg_executor.clone());
let recommended = matcher_rec.exact_search(&query);
let all = matcher_all.fuzzy_search(&query);
@@ -690,7 +702,7 @@ mod tests {
("ollama", "mistral"),
("ollama", "deepseek"),
]);
let matcher = ModelMatcher::new(models, cx.background_executor.clone());
let matcher = ModelMatcher::new(models, cx.foreground_executor(), cx.executor());
// The order of models should be maintained, case doesn't matter
let results = matcher.exact_search("GPT-4.1");
@@ -718,7 +730,7 @@ mod tests {
("ollama", "mistral"),
("ollama", "deepseek"),
]);
let matcher = ModelMatcher::new(models, cx.background_executor.clone());
let matcher = ModelMatcher::new(models, cx.foreground_executor(), cx.executor());
// Results should preserve models order whenever possible.
// In the case below, `zed/gpt-4.1` and `openai/gpt-4.1` have identical

View File

@@ -11,7 +11,7 @@ use client::{Client, UserStore};
use collections::HashMap;
use fs::FakeFs;
use futures::{FutureExt, future::LocalBoxFuture};
use gpui::{AppContext, TestAppContext, Timer};
use gpui::{AppContext, TestAppContext, TestScheduler, TestSchedulerConfig, Timer};
use http_client::StatusCode;
use indoc::{formatdoc, indoc};
use language_model::{
@@ -1399,9 +1399,9 @@ fn eval(
}
fn run_eval(eval: EvalInput, tx: mpsc::Sender<Result<EvalOutput>>) {
let dispatcher = gpui::TestDispatcher::new(StdRng::from_os_rng());
let mut cx = TestAppContext::build(dispatcher, None);
let output = cx.executor().block_test(async {
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
let mut cx = TestAppContext::build(scheduler, None);
let output = cx.foreground_executor().block_on(async {
let test = EditAgentTest::new(&mut cx).await;
test.eval(eval, &mut cx).await
});

View File

@@ -1,21 +1,13 @@
use futures::channel::oneshot;
use git2::{DiffLineType as GitDiffLineType, DiffOptions as GitOptions, Patch as GitPatch};
use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, TaskLabel};
use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task};
use language::{Language, LanguageRegistry};
use rope::Rope;
use std::{
cmp::Ordering,
future::Future,
iter,
ops::Range,
sync::{Arc, LazyLock},
};
use std::{cmp::Ordering, future::Future, iter, ops::Range, sync::Arc};
use sum_tree::SumTree;
use text::{Anchor, Bias, BufferId, OffsetRangeExt, Point, ToOffset as _};
use util::ResultExt;
pub static CALCULATE_DIFF_TASK: LazyLock<TaskLabel> = LazyLock::new(TaskLabel::new);
pub struct BufferDiff {
pub buffer_id: BufferId,
inner: BufferDiffInner,
@@ -201,12 +193,10 @@ impl BufferDiffSnapshot {
base_text_exists = false;
};
let hunks = cx
.background_executor()
.spawn_labeled(*CALCULATE_DIFF_TASK, {
let buffer = buffer.clone();
async move { compute_hunks(base_text_pair, buffer) }
});
let hunks = cx.background_executor().spawn({
let buffer = buffer.clone();
async move { compute_hunks(base_text_pair, buffer) }
});
async move {
let (base_text, hunks) = futures::join!(base_text_snapshot, hunks);
@@ -233,18 +223,17 @@ impl BufferDiffSnapshot {
debug_assert_eq!(&*text, &base_text_snapshot.text());
(text, base_text_snapshot.as_rope().clone())
});
cx.background_executor()
.spawn_labeled(*CALCULATE_DIFF_TASK, async move {
Self {
inner: BufferDiffInner {
base_text: base_text_snapshot,
pending_hunks: SumTree::new(&buffer),
hunks: compute_hunks(base_text_pair, buffer),
base_text_exists,
},
secondary_diff: None,
}
})
cx.background_executor().spawn(async move {
Self {
inner: BufferDiffInner {
base_text: base_text_snapshot,
pending_hunks: SumTree::new(&buffer),
hunks: compute_hunks(base_text_pair, buffer),
base_text_exists,
},
secondary_diff: None,
}
})
}
#[cfg(test)]
@@ -253,7 +242,7 @@ impl BufferDiffSnapshot {
diff_base: String,
cx: &mut gpui::TestAppContext,
) -> BufferDiffSnapshot {
cx.executor().block(cx.update(|cx| {
cx.foreground_executor().block_on(cx.update(|cx| {
Self::new_with_base_text(buffer, Some(Arc::new(diff_base)), None, None, cx)
}))
}
@@ -919,7 +908,7 @@ impl BufferDiff {
None,
cx,
);
let snapshot = cx.background_executor().block(snapshot);
let snapshot = cx.foreground_executor().block_on(snapshot);
Self {
buffer_id: buffer.read(cx).remote_id(),
inner: snapshot.inner,
@@ -1222,7 +1211,7 @@ impl BufferDiff {
self.inner.base_text.clone(),
cx,
);
let snapshot = cx.background_executor().block(snapshot);
let snapshot = cx.foreground_executor().block_on(snapshot);
self.set_snapshot(snapshot, &buffer, cx);
}
}

View File

@@ -3,7 +3,7 @@ use anyhow::{Context as _, Result, anyhow};
use cloud_api_client::{AuthenticatedUser, GetAuthenticatedUserResponse, PlanInfo};
use cloud_llm_client::{CurrentUsage, Plan, UsageData, UsageLimit};
use futures::{StreamExt, stream::BoxStream};
use gpui::{AppContext as _, BackgroundExecutor, Entity, TestAppContext};
use gpui::{AppContext as _, Entity, TestAppContext};
use http_client::{AsyncBody, Method, Request, http};
use parking_lot::Mutex;
use rpc::{ConnectionId, Peer, Receipt, TypedEnvelope, proto};
@@ -13,7 +13,6 @@ pub struct FakeServer {
peer: Arc<Peer>,
state: Arc<Mutex<FakeServerState>>,
user_id: u64,
executor: BackgroundExecutor,
}
#[derive(Default)]
@@ -35,7 +34,6 @@ impl FakeServer {
peer: Peer::new(0),
state: Default::default(),
user_id: client_user_id,
executor: cx.executor(),
};
client.http_client().as_fake().replace_handler({
@@ -181,8 +179,6 @@ impl FakeServer {
#[allow(clippy::await_holding_lock)]
pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
self.executor.start_waiting();
let message = self
.state
.lock()
@@ -192,7 +188,6 @@ impl FakeServer {
.next()
.await
.context("other half hung up")?;
self.executor.finish_waiting();
let type_name = message.payload_type_name();
let message = message.into_any();

View File

@@ -253,7 +253,6 @@ async fn test_channel_notes_participant_indices(
let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
// Clients A and B open the same file.
executor.start_waiting();
let editor_a = workspace_a
.update_in(cx_a, |workspace, window, cx| {
workspace.open_path((worktree_id_a, "file.txt"), None, true, window, cx)
@@ -262,7 +261,6 @@ async fn test_channel_notes_participant_indices(
.unwrap()
.downcast::<Editor>()
.unwrap();
executor.start_waiting();
let editor_b = workspace_b
.update_in(cx_b, |workspace, window, cx| {
workspace.open_path((worktree_id_a, "file.txt"), None, true, window, cx)

View File

@@ -364,7 +364,6 @@ async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mu
// Receive a completion request as the host's language server.
// Return some completions from the host's language server.
cx_a.executor().start_waiting();
fake_language_server
.set_request_handler::<lsp::request::Completion, _, _>(|params, _| async move {
assert_eq!(
@@ -408,7 +407,6 @@ async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mu
.next()
.await
.unwrap();
cx_a.executor().finish_waiting();
// Open the buffer on the host.
let buffer_a = project_a
@@ -1712,7 +1710,6 @@ async fn test_on_input_format_from_guest_to_host(
// Receive an OnTypeFormatting request as the host's language server.
// Return some formatting from the host's language server.
executor.start_waiting();
fake_language_server
.set_request_handler::<lsp::request::OnTypeFormatting, _, _>(|params, _| async move {
assert_eq!(
@@ -1732,7 +1729,6 @@ async fn test_on_input_format_from_guest_to_host(
.next()
.await
.unwrap();
executor.finish_waiting();
// Open the buffer on the host and see that the formatting worked
let buffer_a = project_a
@@ -1872,7 +1868,6 @@ async fn test_mutual_editor_inlay_hint_cache_update(
.unwrap();
let (workspace_a, cx_a) = client_a.build_workspace(&project_a, cx_a);
executor.start_waiting();
// The host opens a rust file.
let _buffer_a = project_a
@@ -2122,8 +2117,6 @@ async fn test_inlay_hint_refresh_is_forwarded(
let (workspace_a, cx_a) = client_a.build_workspace(&project_a, cx_a);
let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
cx_a.background_executor.start_waiting();
let editor_a = workspace_a
.update_in(cx_a, |workspace, window, cx| {
workspace.open_path((worktree_id, "main.rs"), None, true, window, cx)
@@ -2175,7 +2168,6 @@ async fn test_inlay_hint_refresh_is_forwarded(
.next()
.await
.unwrap();
executor.finish_waiting();
executor.run_until_parked();
editor_a.update(cx_a, |editor, _| {
@@ -2583,7 +2575,6 @@ async fn test_lsp_pull_diagnostics(
.unwrap();
let (workspace_a, cx_a) = client_a.build_workspace(&project_a, cx_a);
executor.start_waiting();
// The host opens a rust file.
let _buffer_a = project_a

View File

@@ -174,9 +174,7 @@ pub async fn run_randomized_test<T: RandomizedTest>(
}
drop(operation_channels);
executor.start_waiting();
futures::future::join_all(client_tasks).await;
executor.finish_waiting();
executor.run_until_parked();
T::on_quiesce(&mut server, &mut clients).await;
@@ -523,10 +521,8 @@ impl<T: RandomizedTest> TestPlan<T> {
server.forbid_connections();
server.disconnect_client(removed_peer_id);
deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
deterministic.start_waiting();
log::info!("waiting for user {} to exit...", removed_user_id);
client_task.await;
deterministic.finish_waiting();
server.allow_connections();
for project in client.dev_server_projects().iter() {

View File

@@ -499,7 +499,7 @@ impl CollabPanel {
self.match_candidates.clear();
self.match_candidates
.push(StringMatchCandidate::new(0, &user.github_login));
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -543,7 +543,7 @@ impl CollabPanel {
&participant.user.github_login,
)
}));
let mut matches = executor.block(match_strings(
let mut matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -595,7 +595,7 @@ impl CollabPanel {
StringMatchCandidate::new(id, &participant.github_login)
},
));
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -626,7 +626,7 @@ impl CollabPanel {
.enumerate()
.map(|(ix, (_, channel))| StringMatchCandidate::new(ix, &channel.name)),
);
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -701,7 +701,7 @@ impl CollabPanel {
.enumerate()
.map(|(ix, channel)| StringMatchCandidate::new(ix, &channel.name)),
);
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -737,7 +737,7 @@ impl CollabPanel {
.enumerate()
.map(|(ix, user)| StringMatchCandidate::new(ix, &user.github_login)),
);
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -762,7 +762,7 @@ impl CollabPanel {
.enumerate()
.map(|(ix, user)| StringMatchCandidate::new(ix, &user.github_login)),
);
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,
@@ -796,7 +796,7 @@ impl CollabPanel {
.map(|(ix, contact)| StringMatchCandidate::new(ix, &contact.user.github_login)),
);
let matches = executor.block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,

View File

@@ -295,7 +295,7 @@ impl PickerDelegate for ChannelModalDelegate {
StringMatchCandidate::new(id, &member.user.github_login)
}));
let matches = cx.background_executor().block(match_strings(
let matches = cx.foreground_executor().block_on(match_strings(
&self.match_candidates,
&query,
true,

View File

@@ -3,6 +3,7 @@ mod persistence;
use std::{
cmp::{self, Reverse},
collections::HashMap,
pin::pin,
sync::Arc,
time::Duration,
};
@@ -357,9 +358,13 @@ impl PickerDelegate for CommandPaletteDelegate {
return true;
};
let result = pin!({
let mut rx = rx.clone();
async move { rx.recv().await }
});
match cx
.background_executor()
.block_with_timeout(duration, rx.clone().recv())
.foreground_executor()
.block_with_timeout(duration, result)
{
Ok(Some((commands, matches))) => {
self.matches_updated(query, commands, matches, cx);

View File

@@ -190,7 +190,7 @@ impl WrapMap {
});
match cx
.background_executor()
.foreground_executor()
.block_with_timeout(Duration::from_millis(5), task)
{
Ok((snapshot, edits)) => {
@@ -269,7 +269,7 @@ impl WrapMap {
});
match cx
.background_executor()
.foreground_executor()
.block_with_timeout(Duration::from_millis(1), update_task)
{
Ok((snapshot, output_edits)) => {

View File

@@ -10658,7 +10658,6 @@ async fn test_document_format_during_save(cx: &mut TestAppContext) {
});
assert!(cx.read(|cx| editor.is_dirty(cx)));
cx.executor().start_waiting();
let fake_server = fake_servers.next().await.unwrap();
{
@@ -10688,7 +10687,6 @@ async fn test_document_format_during_save(cx: &mut TestAppContext) {
)
})
.unwrap();
cx.executor().start_waiting();
save.await;
assert_eq!(
@@ -10729,7 +10727,6 @@ async fn test_document_format_during_save(cx: &mut TestAppContext) {
})
.unwrap();
cx.executor().advance_clock(super::FORMAT_TIMEOUT);
cx.executor().start_waiting();
save.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -10775,7 +10772,6 @@ async fn test_document_format_during_save(cx: &mut TestAppContext) {
)
})
.unwrap();
cx.executor().start_waiting();
save.await;
}
}
@@ -10842,7 +10838,6 @@ async fn test_redo_after_noop_format(cx: &mut TestAppContext) {
)
})
.unwrap();
cx.executor().start_waiting();
save.await;
assert!(!cx.read(|cx| editor.is_dirty(cx)));
}
@@ -11011,7 +11006,6 @@ async fn test_multibuffer_format_during_save(cx: &mut TestAppContext) {
});
cx.executor().run_until_parked();
cx.executor().start_waiting();
let save = multi_buffer_editor
.update_in(cx, |editor, window, cx| {
editor.save(
@@ -11273,7 +11267,6 @@ async fn setup_range_format_test(
build_editor_with_project(project.clone(), buffer, window, cx)
});
cx.executor().start_waiting();
let fake_server = fake_servers.next().await.unwrap();
(project, editor, cx, fake_server)
@@ -11315,7 +11308,6 @@ async fn test_range_format_on_save_success(cx: &mut TestAppContext) {
})
.next()
.await;
cx.executor().start_waiting();
save.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -11358,7 +11350,6 @@ async fn test_range_format_on_save_timeout(cx: &mut TestAppContext) {
})
.unwrap();
cx.executor().advance_clock(super::FORMAT_TIMEOUT);
cx.executor().start_waiting();
save.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -11390,7 +11381,6 @@ async fn test_range_format_not_called_for_clean_buffer(cx: &mut TestAppContext)
panic!("Should not be invoked");
})
.next();
cx.executor().start_waiting();
save.await;
cx.run_until_parked();
}
@@ -11500,7 +11490,6 @@ async fn test_document_format_manual_trigger(cx: &mut TestAppContext) {
editor.set_text("one\ntwo\nthree\n", window, cx)
});
cx.executor().start_waiting();
let fake_server = fake_servers.next().await.unwrap();
let format = editor
@@ -11528,7 +11517,6 @@ async fn test_document_format_manual_trigger(cx: &mut TestAppContext) {
})
.next()
.await;
cx.executor().start_waiting();
format.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -11561,7 +11549,6 @@ async fn test_document_format_manual_trigger(cx: &mut TestAppContext) {
})
.unwrap();
cx.executor().advance_clock(super::FORMAT_TIMEOUT);
cx.executor().start_waiting();
format.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -11622,8 +11609,6 @@ async fn test_multiple_formatters(cx: &mut TestAppContext) {
build_editor_with_project(project.clone(), buffer, window, cx)
});
cx.executor().start_waiting();
let fake_server = fake_servers.next().await.unwrap();
fake_server.set_request_handler::<lsp::request::Formatting, _, _>(
move |_params, _| async move {
@@ -11722,7 +11707,6 @@ async fn test_multiple_formatters(cx: &mut TestAppContext) {
}
});
cx.executor().start_waiting();
editor
.update_in(cx, |editor, window, cx| {
editor.perform_format(
@@ -11893,7 +11877,6 @@ async fn test_organize_imports_manual_trigger(cx: &mut TestAppContext) {
)
});
cx.executor().start_waiting();
let fake_server = fake_servers.next().await.unwrap();
let format = editor
@@ -11939,7 +11922,6 @@ async fn test_organize_imports_manual_trigger(cx: &mut TestAppContext) {
})
.next()
.await;
cx.executor().start_waiting();
format.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -11975,7 +11957,6 @@ async fn test_organize_imports_manual_trigger(cx: &mut TestAppContext) {
})
.unwrap();
cx.executor().advance_clock(super::CODE_ACTION_TIMEOUT);
cx.executor().start_waiting();
format.await;
assert_eq!(
editor.update(cx, |editor, cx| editor.text(cx)),
@@ -12028,9 +12009,7 @@ async fn test_concurrent_format_requests(cx: &mut TestAppContext) {
// Wait for both format requests to complete
cx.executor().advance_clock(Duration::from_millis(200));
cx.executor().start_waiting();
format_1.await.unwrap();
cx.executor().start_waiting();
format_2.await.unwrap();
// The formatting edits only happens once.
@@ -16685,7 +16664,6 @@ async fn test_on_type_formatting_not_triggered(cx: &mut TestAppContext) {
.downcast::<Editor>()
.unwrap();
cx.executor().start_waiting();
let fake_server = fake_servers.next().await.unwrap();
fake_server.set_request_handler::<lsp::request::OnTypeFormatting, _, _>(

View File

@@ -106,7 +106,7 @@ impl Editor {
// Try to resolve the indent in a short amount of time, otherwise move it to a background task.
match cx
.background_executor()
.foreground_executor()
.block_with_timeout(Duration::from_micros(200), task)
{
Ok(result) => state.active_indent_range = result,

View File

@@ -8,10 +8,9 @@ use extension::{
};
use extension_host::wasm_host::WasmHost;
use fs::RealFs;
use gpui::{SemanticVersion, TestAppContext, TestDispatcher};
use gpui::{SemanticVersion, TestAppContext, TestScheduler};
use http_client::{FakeHttpClient, Response};
use node_runtime::NodeRuntime;
use rand::{SeedableRng, rngs::StdRng};
use reqwest_client::ReqwestClient;
use serde_json::json;
use settings::SettingsStore;
@@ -36,8 +35,8 @@ fn extension_benchmarks(c: &mut Criterion) {
|| wasm_bytes.clone(),
|wasm_bytes| {
let _extension = cx
.executor()
.block(wasm_host.load_extension(wasm_bytes, &manifest, cx.executor()))
.foreground_executor()
.block_on(wasm_host.load_extension(wasm_bytes, &manifest, cx.executor()))
.unwrap();
},
BatchSize::SmallInput,
@@ -46,9 +45,10 @@ fn extension_benchmarks(c: &mut Criterion) {
}
fn init() -> TestAppContext {
const SEED: u64 = 9999;
let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(SEED));
let cx = TestAppContext::build(dispatcher, None);
let scheduler = Arc::new(TestScheduler::new(gpui::TestSchedulerConfig::with_seed(
9999,
)));
let cx = TestAppContext::build(scheduler, None);
cx.executor().allow_parking();
cx.update(|cx| {
let store = SettingsStore::test(cx);
@@ -67,8 +67,8 @@ fn wasm_bytes(cx: &TestAppContext, manifest: &mut ExtensionManifest) -> Vec<u8>
.parent()
.unwrap()
.join("extensions/test-extension");
cx.executor()
.block(extension_builder.compile_extension(
cx.foreground_executor()
.block_on(extension_builder.compile_extension(
&path,
manifest,
CompileExtensionOptions { release: true },

View File

@@ -47,6 +47,7 @@ use remote::{RemoteClient, RemoteConnectionOptions};
use semantic_version::SemanticVersion;
use serde::{Deserialize, Serialize};
use settings::Settings;
use std::future;
use std::ops::RangeInclusive;
use std::str::FromStr;
use std::{
@@ -278,7 +279,7 @@ impl ExtensionStore {
// list of the installed extensions and the resources that they provide.
// This index is loaded synchronously on startup.
let (index_content, index_metadata, extensions_metadata) =
cx.background_executor().block(async {
cx.foreground_executor().block_on(async {
futures::join!(
this.fs.load(&this.index_path),
this.fs.metadata(&this.index_path),
@@ -331,7 +332,7 @@ impl ExtensionStore {
load_initial_extensions.await;
let mut index_changed = false;
let mut debounce_timer = cx.background_spawn(futures::future::pending()).fuse();
let mut debounce_timer = future::pending().boxed().fuse();
loop {
select_biased! {
_ = debounce_timer => {
@@ -350,6 +351,7 @@ impl ExtensionStore {
debounce_timer = cx
.background_executor()
.timer(RELOAD_DEBOUNCE_DURATION)
.boxed()
.fuse();
}
extension_id = reload_rx.next() => {
@@ -361,6 +363,7 @@ impl ExtensionStore {
debounce_timer = cx
.background_executor()
.timer(RELOAD_DEBOUNCE_DURATION)
.boxed()
.fuse();
}
}

View File

@@ -85,6 +85,7 @@ blade-graphics = { workspace = true, optional = true }
blade-macros = { workspace = true, optional = true }
blade-util = { workspace = true, optional = true }
bytemuck = { version = "1", optional = true }
chrono.workspace = true
collections.workspace = true
ctor.workspace = true
derive_more.workspace = true
@@ -110,6 +111,7 @@ resvg = { version = "0.45.0", default-features = false, features = [
"memmap-fonts",
] }
usvg = { version = "0.45.0", default-features = false }
scheduler.workspace = true
schemars.workspace = true
seahash = "4.1"
semantic_version.workspace = true

View File

@@ -603,10 +603,6 @@ impl App {
) -> Rc<AppCell> {
let executor = platform.background_executor();
let foreground_executor = platform.foreground_executor();
assert!(
executor.is_main_thread(),
"must construct App on main thread"
);
let text_system = Arc::new(TextSystem::new(platform.text_system()));
let entities = EntityMap::new();
@@ -712,7 +708,7 @@ impl App {
let futures = futures::future::join_all(futures);
if self
.background_executor
.foreground_executor
.block_with_timeout(SHUTDOWN_TIMEOUT, futures)
.is_err()
{

View File

@@ -3,13 +3,12 @@ use crate::{
BackgroundExecutor, BorrowAppContext, Bounds, Capslock, ClipboardItem, DrawPhase, Drawable,
Element, Empty, EventEmitter, ForegroundExecutor, Global, InputEvent, Keystroke, Modifiers,
ModifiersChangedEvent, MouseButton, MouseDownEvent, MouseMoveEvent, MouseUpEvent, Pixels,
Platform, Point, Render, Result, Size, Task, TestDispatcher, TestPlatform,
TestScreenCaptureSource, TestWindow, TextSystem, VisualContext, Window, WindowBounds,
WindowHandle, WindowOptions,
Platform, Point, Render, Result, Size, Task, TestPlatform, TestScreenCaptureSource, TestWindow,
TextSystem, VisualContext, Window, WindowBounds, WindowHandle, WindowOptions,
};
use anyhow::{anyhow, bail};
use futures::{Stream, StreamExt, channel::oneshot};
use rand::{SeedableRng, rngs::StdRng};
use scheduler::{TestScheduler, TestSchedulerConfig};
use std::{cell::RefCell, future::Future, ops::Deref, rc::Rc, sync::Arc, time::Duration};
/// A TestAppContext is provided to tests created with `#[gpui::test]`, it provides
@@ -23,7 +22,7 @@ pub struct TestAppContext {
#[doc(hidden)]
pub foreground_executor: ForegroundExecutor,
#[doc(hidden)]
pub dispatcher: TestDispatcher,
pub scheduler: Arc<TestScheduler>,
test_platform: Rc<TestPlatform>,
text_system: Arc<TextSystem>,
fn_name: Option<&'static str>,
@@ -121,10 +120,9 @@ impl AppContext for TestAppContext {
impl TestAppContext {
/// Creates a new `TestAppContext`. Usually you can rely on `#[gpui::test]` to do this for you.
pub fn build(dispatcher: TestDispatcher, fn_name: Option<&'static str>) -> Self {
let arc_dispatcher = Arc::new(dispatcher.clone());
let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
pub fn build(scheduler: Arc<TestScheduler>, fn_name: Option<&'static str>) -> Self {
let background_executor = BackgroundExecutor::new(scheduler.background());
let foreground_executor = ForegroundExecutor::new(scheduler.foreground());
let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
let asset_source = Arc::new(());
let http_client = http_client::FakeHttpClient::with_404_response();
@@ -134,7 +132,7 @@ impl TestAppContext {
app: App::new_app(platform.clone(), asset_source, http_client),
background_executor,
foreground_executor,
dispatcher,
scheduler,
test_platform: platform,
text_system,
fn_name,
@@ -144,8 +142,10 @@ impl TestAppContext {
/// Create a single TestAppContext, for non-multi-client tests
pub fn single() -> Self {
let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
Self::build(dispatcher, None)
Self::build(
Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(0))),
None,
)
}
/// The name of the test function that created this `TestAppContext`
@@ -160,7 +160,7 @@ impl TestAppContext {
/// returns a new `TestAppContext` re-using the same executors to interleave tasks.
pub fn new_app(&self) -> TestAppContext {
Self::build(self.dispatcher.clone(), self.fn_name)
Self::build(self.scheduler.clone(), self.fn_name)
}
/// Called by the test helper to end the test.
@@ -188,8 +188,8 @@ impl TestAppContext {
}
/// Returns an executor (for running tasks on the main thread)
pub fn foreground_executor(&self) -> &ForegroundExecutor {
&self.foreground_executor
pub fn foreground_executor(&self) -> ForegroundExecutor {
self.foreground_executor.clone()
}
#[expect(clippy::wrong_self_convention)]
@@ -646,11 +646,9 @@ impl<V> Entity<V> {
}
}
cx.borrow().background_executor().start_waiting();
rx.recv()
.await
.expect("view dropped with pending condition");
cx.borrow().background_executor().finish_waiting();
}
})
.await

View File

@@ -1,601 +1 @@
use crate::{App, PlatformDispatcher};
use async_task::Runnable;
use futures::channel::mpsc;
use smol::prelude::*;
use std::mem::ManuallyDrop;
use std::panic::Location;
use std::thread::{self, ThreadId};
use std::{
fmt::Debug,
marker::PhantomData,
mem,
num::NonZeroUsize,
pin::Pin,
rc::Rc,
sync::{
Arc,
atomic::{AtomicUsize, Ordering::SeqCst},
},
task::{Context, Poll},
time::{Duration, Instant},
};
use util::TryFutureExt;
use waker_fn::waker_fn;
#[cfg(any(test, feature = "test-support"))]
use rand::rngs::StdRng;
/// A pointer to the executor that is currently running,
/// for spawning background tasks.
#[derive(Clone)]
pub struct BackgroundExecutor {
#[doc(hidden)]
pub dispatcher: Arc<dyn PlatformDispatcher>,
}
/// A pointer to the executor that is currently running,
/// for spawning tasks on the main thread.
///
/// This is intentionally `!Send` via the `not_send` marker field. This is because
/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
/// only polled from the same thread it was spawned from. These checks would fail when spawning
/// foreground tasks from from background threads.
#[derive(Clone)]
pub struct ForegroundExecutor {
#[doc(hidden)]
pub dispatcher: Arc<dyn PlatformDispatcher>,
not_send: PhantomData<Rc<()>>,
}
/// Task is a primitive that allows work to happen in the background.
///
/// It implements [`Future`] so you can `.await` on it.
///
/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
/// the task to continue running, but with no way to return a value.
#[must_use]
#[derive(Debug)]
pub struct Task<T>(TaskState<T>);
#[derive(Debug)]
enum TaskState<T> {
/// A task that is ready to return a value
Ready(Option<T>),
/// A task that is currently running.
Spawned(async_task::Task<T>),
}
impl<T> Task<T> {
/// Creates a new task that will resolve with the value
pub fn ready(val: T) -> Self {
Task(TaskState::Ready(Some(val)))
}
/// Detaching a task runs it to completion in the background
pub fn detach(self) {
match self {
Task(TaskState::Ready(_)) => {}
Task(TaskState::Spawned(task)) => task.detach(),
}
}
}
impl<E, T> Task<Result<T, E>>
where
T: 'static,
E: 'static + Debug,
{
/// Run the task to completion in the background and log any
/// errors that occur.
#[track_caller]
pub fn detach_and_log_err(self, cx: &App) {
let location = core::panic::Location::caller();
cx.foreground_executor()
.spawn(self.log_tracked_err(*location))
.detach();
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match unsafe { self.get_unchecked_mut() } {
Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
Task(TaskState::Spawned(task)) => task.poll(cx),
}
}
}
/// A task label is an opaque identifier that you can use to
/// refer to a task in tests.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct TaskLabel(NonZeroUsize);
impl Default for TaskLabel {
fn default() -> Self {
Self::new()
}
}
impl TaskLabel {
/// Construct a new task label.
pub fn new() -> Self {
static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
Self(NEXT_TASK_LABEL.fetch_add(1, SeqCst).try_into().unwrap())
}
}
type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
/// BackgroundExecutor lets you run things on background threads.
/// In production this is a thread pool with no ordering guarantees.
/// In tests this is simulated by running tasks one by one in a deterministic
/// (but arbitrary) order controlled by the `SEED` environment variable.
impl BackgroundExecutor {
#[doc(hidden)]
pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
Self { dispatcher }
}
/// Enqueues the given future to be run to completion on a background thread.
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where
R: Send + 'static,
{
self.spawn_internal::<R>(Box::pin(future), None)
}
/// Enqueues the given future to be run to completion on a background thread.
/// The given label can be used to control the priority of the task in tests.
pub fn spawn_labeled<R>(
&self,
label: TaskLabel,
future: impl Future<Output = R> + Send + 'static,
) -> Task<R>
where
R: Send + 'static,
{
self.spawn_internal::<R>(Box::pin(future), Some(label))
}
fn spawn_internal<R: Send + 'static>(
&self,
future: AnyFuture<R>,
label: Option<TaskLabel>,
) -> Task<R> {
let dispatcher = self.dispatcher.clone();
let (runnable, task) =
async_task::spawn(future, move |runnable| dispatcher.dispatch(runnable, label));
runnable.schedule();
Task(TaskState::Spawned(task))
}
/// Used by the test harness to run an async test in a synchronous fashion.
#[cfg(any(test, feature = "test-support"))]
#[track_caller]
pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
if let Ok(value) = self.block_internal(false, future, None) {
value
} else {
unreachable!()
}
}
/// Block the current thread until the given future resolves.
/// Consider using `block_with_timeout` instead.
pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
if let Ok(value) = self.block_internal(true, future, None) {
value
} else {
unreachable!()
}
}
#[cfg(not(any(test, feature = "test-support")))]
pub(crate) fn block_internal<Fut: Future>(
&self,
_background_only: bool,
future: Fut,
timeout: Option<Duration>,
) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
use std::time::Instant;
let mut future = Box::pin(future);
if timeout == Some(Duration::ZERO) {
return Err(future);
}
let deadline = timeout.map(|timeout| Instant::now() + timeout);
let unparker = self.dispatcher.unparker();
let waker = waker_fn(move || {
unparker.unpark();
});
let mut cx = std::task::Context::from_waker(&waker);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(result) => return Ok(result),
Poll::Pending => {
let timeout =
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
if !self.dispatcher.park(timeout)
&& deadline.is_some_and(|deadline| deadline < Instant::now())
{
return Err(future);
}
}
}
}
}
#[cfg(any(test, feature = "test-support"))]
#[track_caller]
pub(crate) fn block_internal<Fut: Future>(
&self,
background_only: bool,
future: Fut,
timeout: Option<Duration>,
) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
use std::sync::atomic::AtomicBool;
let mut future = Box::pin(future);
if timeout == Some(Duration::ZERO) {
return Err(future);
}
let Some(dispatcher) = self.dispatcher.as_test() else {
return Err(future);
};
let mut max_ticks = if timeout.is_some() {
dispatcher.gen_block_on_ticks()
} else {
usize::MAX
};
let unparker = self.dispatcher.unparker();
let awoken = Arc::new(AtomicBool::new(false));
let waker = waker_fn({
let awoken = awoken.clone();
move || {
awoken.store(true, SeqCst);
unparker.unpark();
}
});
let mut cx = std::task::Context::from_waker(&waker);
loop {
match future.as_mut().poll(&mut cx) {
Poll::Ready(result) => return Ok(result),
Poll::Pending => {
if max_ticks == 0 {
return Err(future);
}
max_ticks -= 1;
if !dispatcher.tick(background_only) {
if awoken.swap(false, SeqCst) {
continue;
}
if !dispatcher.parking_allowed() {
if dispatcher.advance_clock_to_next_delayed() {
continue;
}
let mut backtrace_message = String::new();
let mut waiting_message = String::new();
if let Some(backtrace) = dispatcher.waiting_backtrace() {
backtrace_message =
format!("\nbacktrace of waiting future:\n{:?}", backtrace);
}
if let Some(waiting_hint) = dispatcher.waiting_hint() {
waiting_message = format!("\n waiting on: {}\n", waiting_hint);
}
panic!(
"parked with nothing left to run{waiting_message}{backtrace_message}",
)
}
self.dispatcher.park(None);
}
}
}
}
}
/// Block the current thread until the given future resolves
/// or `duration` has elapsed.
pub fn block_with_timeout<Fut: Future>(
&self,
duration: Duration,
future: Fut,
) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
self.block_internal(true, future, Some(duration))
}
/// Scoped lets you start a number of tasks and waits
/// for all of them to complete before returning.
pub async fn scoped<'scope, F>(&self, scheduler: F)
where
F: FnOnce(&mut Scope<'scope>),
{
let mut scope = Scope::new(self.clone());
(scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
.map(|f| self.spawn(f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
}
}
/// Get the current time.
///
/// Calling this instead of `std::time::Instant::now` allows the use
/// of fake timers in tests.
pub fn now(&self) -> Instant {
self.dispatcher.now()
}
/// Returns a task that will complete after the given duration.
/// Depending on other concurrent tasks the elapsed duration may be longer
/// than requested.
pub fn timer(&self, duration: Duration) -> Task<()> {
if duration.is_zero() {
return Task::ready(());
}
let (runnable, task) = async_task::spawn(async move {}, {
let dispatcher = self.dispatcher.clone();
move |runnable| dispatcher.dispatch_after(duration, runnable)
});
runnable.schedule();
Task(TaskState::Spawned(task))
}
/// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
#[cfg(any(test, feature = "test-support"))]
pub fn start_waiting(&self) {
self.dispatcher.as_test().unwrap().start_waiting();
}
/// in tests, removes the debugging data added by start_waiting
#[cfg(any(test, feature = "test-support"))]
pub fn finish_waiting(&self) {
self.dispatcher.as_test().unwrap().finish_waiting();
}
/// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
#[cfg(any(test, feature = "test-support"))]
pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
self.dispatcher.as_test().unwrap().simulate_random_delay()
}
/// in tests, indicate that a given task from `spawn_labeled` should run after everything else
#[cfg(any(test, feature = "test-support"))]
pub fn deprioritize(&self, task_label: TaskLabel) {
self.dispatcher.as_test().unwrap().deprioritize(task_label)
}
/// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
#[cfg(any(test, feature = "test-support"))]
pub fn advance_clock(&self, duration: Duration) {
self.dispatcher.as_test().unwrap().advance_clock(duration)
}
/// in tests, run one task.
#[cfg(any(test, feature = "test-support"))]
pub fn tick(&self) -> bool {
self.dispatcher.as_test().unwrap().tick(false)
}
/// in tests, run all tasks that are ready to run. If after doing so
/// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
#[cfg(any(test, feature = "test-support"))]
pub fn run_until_parked(&self) {
self.dispatcher.as_test().unwrap().run_until_parked()
}
/// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
/// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
/// do take real async time to run.
#[cfg(any(test, feature = "test-support"))]
pub fn allow_parking(&self) {
self.dispatcher.as_test().unwrap().allow_parking();
}
/// undoes the effect of [`Self::allow_parking`].
#[cfg(any(test, feature = "test-support"))]
pub fn forbid_parking(&self) {
self.dispatcher.as_test().unwrap().forbid_parking();
}
/// adds detail to the "parked with nothing let to run" message.
#[cfg(any(test, feature = "test-support"))]
pub fn set_waiting_hint(&self, msg: Option<String>) {
self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
}
/// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
#[cfg(any(test, feature = "test-support"))]
pub fn rng(&self) -> StdRng {
self.dispatcher.as_test().unwrap().rng()
}
/// How many CPUs are available to the dispatcher.
pub fn num_cpus(&self) -> usize {
#[cfg(any(test, feature = "test-support"))]
return 4;
#[cfg(not(any(test, feature = "test-support")))]
return num_cpus::get();
}
/// Whether we're on the main thread.
pub fn is_main_thread(&self) -> bool {
self.dispatcher.is_main_thread()
}
#[cfg(any(test, feature = "test-support"))]
/// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
}
}
/// ForegroundExecutor runs things on the main thread.
impl ForegroundExecutor {
/// Creates a new ForegroundExecutor from the given PlatformDispatcher.
pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
Self {
dispatcher,
not_send: PhantomData,
}
}
/// Enqueues the given Task to run on the main thread at some point in the future.
#[track_caller]
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
where
R: 'static,
{
let dispatcher = self.dispatcher.clone();
#[track_caller]
fn inner<R: 'static>(
dispatcher: Arc<dyn PlatformDispatcher>,
future: AnyLocalFuture<R>,
) -> Task<R> {
let (runnable, task) = spawn_local_with_source_location(future, move |runnable| {
dispatcher.dispatch_on_main_thread(runnable)
});
runnable.schedule();
Task(TaskState::Spawned(task))
}
inner::<R>(dispatcher, Box::pin(future))
}
}
/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
///
/// Copy-modified from:
/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
#[track_caller]
fn spawn_local_with_source_location<Fut, S>(
future: Fut,
schedule: S,
) -> (Runnable<()>, async_task::Task<Fut::Output, ()>)
where
Fut: Future + 'static,
Fut::Output: 'static,
S: async_task::Schedule<()> + Send + Sync + 'static,
{
#[inline]
fn thread_id() -> ThreadId {
std::thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
struct Checked<F> {
id: ThreadId,
inner: ManuallyDrop<F>,
location: &'static Location<'static>,
}
impl<F> Drop for Checked<F> {
fn drop(&mut self) {
assert!(
self.id == thread_id(),
"local task dropped by a thread that didn't spawn it. Task spawned at {}",
self.location
);
unsafe {
ManuallyDrop::drop(&mut self.inner);
}
}
}
impl<F: Future> Future for Checked<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(
self.id == thread_id(),
"local task polled by a thread that didn't spawn it. Task spawned at {}",
self.location
);
unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
}
}
// Wrap the future into one that checks which thread it's on.
let future = Checked {
id: thread_id(),
inner: ManuallyDrop::new(future),
location: Location::caller(),
};
unsafe { async_task::spawn_unchecked(future, schedule) }
}
/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
pub struct Scope<'a> {
executor: BackgroundExecutor,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>,
lifetime: PhantomData<&'a ()>,
}
impl<'a> Scope<'a> {
fn new(executor: BackgroundExecutor) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
executor,
tx: Some(tx),
rx,
futures: Default::default(),
lifetime: PhantomData,
}
}
/// How many CPUs are available to the dispatcher.
pub fn num_cpus(&self) -> usize {
self.executor.num_cpus()
}
/// Spawn a future into this scope.
pub fn spawn<F>(&mut self, f: F)
where
F: Future<Output = ()> + Send + 'a,
{
let tx = self.tx.clone().unwrap();
// SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
// dropping this `Scope` blocks until all of the futures have resolved.
let f = unsafe {
mem::transmute::<
Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
>(Box::pin(async move {
f.await;
drop(tx);
}))
};
self.futures.push(f);
}
}
impl Drop for Scope<'_> {
fn drop(&mut self) {
self.tx.take().unwrap();
// Wait until the channel is closed, which means that all of the spawned
// futures have resolved.
self.executor.block(self.rx.next());
}
}
pub use crate::executor2::*;

View File

@@ -0,0 +1,307 @@
use crate::App;
use futures::channel::mpsc;
use scheduler::Timer;
use smol::prelude::*;
use std::sync::Arc;
use std::{
fmt::Debug,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use util::TryFutureExt;
pub use scheduler::{Scheduler, Yield};
#[cfg(any(test, feature = "test-support"))]
use rand::rngs::StdRng;
/// A pointer to the executor that is currently running,
/// for spawning background tasks.
#[derive(Clone)]
pub struct BackgroundExecutor(scheduler::BackgroundExecutor);
/// A pointer to the executor that is currently running,
/// for spawning tasks on the main thread.
///
/// This is intentionally `!Send` via the `not_send` marker field. This is because
/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
/// only polled from the same thread it was spawned from. These checks would fail when spawning
/// foreground tasks from from background threads.
#[derive(Clone)]
pub struct ForegroundExecutor(scheduler::ForegroundExecutor);
/// Task is a primitive that allows work to happen in the background.
///
/// It implements [`Future`] so you can `.await` on it.
///
/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
/// the task to continue running, but with no way to return a value.
#[must_use]
#[derive(Debug)]
pub struct Task<T>(scheduler::Task<T>);
impl<T> Task<T> {
/// Creates a new task that will resolve with the value
pub fn ready(val: T) -> Self {
Task(scheduler::Task::ready(val))
}
/// Detaching a task runs it to completion in the background
pub fn detach(self) {
self.0.detach()
}
}
impl<E, T> Task<Result<T, E>>
where
T: 'static,
E: 'static + Debug,
{
/// Run the task to completion in the background and log any
/// errors that occur.
#[track_caller]
pub fn detach_and_log_err(self, cx: &App) {
let location = core::panic::Location::caller();
cx.foreground_executor()
.spawn(self.log_tracked_err(*location))
.detach();
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|t| &mut t.0) }.poll(cx)
}
}
type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
/// BackgroundExecutor lets you run things on background threads.
/// In production this is a thread pool with no ordering guarantees.
/// In tests this is simulated by running tasks one by one in a deterministic
/// (but arbitrary) order controlled by the `SEED` environment variable.
impl BackgroundExecutor {
/// Create a new BackgroundExecutor
pub fn new(executor: scheduler::BackgroundExecutor) -> Self {
Self(executor)
}
/// Enqueues the given future to be run to completion on a background thread.
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where
R: Send + 'static,
{
fn inner<R: Send + 'static>(
executor: &scheduler::BackgroundExecutor,
future: AnyFuture<R>,
) -> Task<R> {
Task(executor.spawn(future))
}
inner(&self.0, Box::pin(future))
}
/// Scoped lets you start a number of tasks and waits
/// for all of them to complete before returning.
pub async fn scoped<'scope, F>(&self, f: F)
where
F: FnOnce(&mut Scope<'scope>),
{
let mut scope = Scope::new(self.clone());
(f)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
.map(|f| self.spawn(f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
}
}
/// Get the current time.
///
/// Calling this instead of `std::time::Instant::now` allows the use
/// of fake timers in tests.
pub fn now(&self) -> Instant {
self.scheduler().clock().now()
}
/// Returns a task that will complete after the given duration.
/// Depending on other concurrent tasks the elapsed duration may be longer
/// than requested.
pub fn timer(&self, duration: Duration) -> Timer {
self.0.timer(duration)
}
/// Get the underlying scheduler.
pub fn scheduler(&self) -> &Arc<dyn Scheduler> {
&self.0.scheduler()
}
/// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
#[cfg(any(test, feature = "test-support"))]
pub fn simulate_random_delay(&self) -> Yield {
self.scheduler().as_test().yield_random()
}
/// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
#[cfg(any(test, feature = "test-support"))]
pub fn advance_clock(&self, duration: Duration) {
self.scheduler().as_test().advance_clock(duration);
}
/// in tests, run all tasks that are ready to run. If after doing so
/// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
#[cfg(any(test, feature = "test-support"))]
pub fn run_until_parked(&self) {
self.scheduler().as_test().run();
}
/// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
/// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
/// do take real async time to run.
#[cfg(any(test, feature = "test-support"))]
pub fn allow_parking(&self) {
self.scheduler().as_test().allow_parking();
}
/// undoes the effect of [`Self::allow_parking`].
#[cfg(any(test, feature = "test-support"))]
pub fn forbid_parking(&self) {
self.scheduler().as_test().forbid_parking();
}
/// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
#[cfg(any(test, feature = "test-support"))]
pub fn rng(&self) -> StdRng {
self.scheduler().as_test().rng().lock().clone()
}
/// How many CPUs are available for this executor.
pub fn num_cpus(&self) -> usize {
#[cfg(any(test, feature = "test-support"))]
return 4;
#[cfg(not(any(test, feature = "test-support")))]
return num_cpus::get();
}
#[cfg(any(test, feature = "test-support"))]
/// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
self.scheduler().as_test().set_timeout_ticks(range)
}
}
/// ForegroundExecutor runs things on the main thread.
impl ForegroundExecutor {
/// Create a new ForegroundExecutor
pub fn new(executor: scheduler::ForegroundExecutor) -> Self {
Self(executor)
}
/// Enqueues the given Task to run on the main thread at some point in the future.
#[track_caller]
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
where
R: 'static,
{
#[track_caller]
fn inner<R: 'static>(
executor: &scheduler::ForegroundExecutor,
future: AnyLocalFuture<R>,
) -> Task<R> {
Task(executor.spawn(future))
}
inner::<R>(&self.0, Box::pin(future))
}
/// Block the current thread until the given future resolves.
/// Consider using `block_with_timeout` instead.
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.0.block_on(future)
}
/// Block the current thread until the given future resolves
/// or `timeout` has elapsed.
pub fn block_with_timeout<Fut: Unpin + Future>(
&self,
timeout: Duration,
future: Fut,
) -> Result<Fut::Output, Fut> {
self.0.block_with_timeout(timeout, future)
}
}
/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
pub struct Scope<'a> {
executor: BackgroundExecutor,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>,
lifetime: PhantomData<&'a ()>,
}
impl<'a> Scope<'a> {
fn new(executor: BackgroundExecutor) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
executor,
tx: Some(tx),
rx,
futures: Default::default(),
lifetime: PhantomData,
}
}
/// How many CPUs are available to the dispatcher.
pub fn num_cpus(&self) -> usize {
self.executor.num_cpus()
}
/// Spawn a future into this scope.
pub fn spawn<F>(&mut self, f: F)
where
F: Future<Output = ()> + Send + 'a,
{
let tx = self.tx.clone().unwrap();
// SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
// dropping this `Scope` blocks until all of the futures have resolved.
let f = unsafe {
mem::transmute::<
Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
>(Box::pin(async move {
f.await;
drop(tx);
}))
};
self.futures.push(f);
}
}
impl Drop for Scope<'_> {
fn drop(&mut self) {
self.tx.take().unwrap();
// Wait until the channel is closed, which means that all of the spawned
// futures have resolved.
self.executor.scheduler().block(
None,
async {
self.rx.next().await;
}
.boxed(),
None,
);
}
}

View File

@@ -78,6 +78,7 @@ pub mod colors;
mod element;
mod elements;
mod executor;
mod executor2;
mod geometry;
mod global;
mod input;

View File

@@ -40,15 +40,13 @@ use crate::{
DEFAULT_WINDOW_SIZE, DevicePixels, DispatchEventResult, Font, FontId, FontMetrics, FontRun,
ForegroundExecutor, GlyphId, GpuSpecs, ImageSource, Keymap, LineLayout, Pixels, PlatformInput,
Point, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Scene, ShapedGlyph,
ShapedRun, SharedString, Size, SvgRenderer, SvgSize, SystemWindowTab, Task, TaskLabel, Window,
ShapedRun, SharedString, Size, SvgRenderer, SvgSize, SystemWindowTab, Task, Window,
WindowControlArea, hash, point, px, size,
};
use anyhow::Result;
use async_task::Runnable;
use futures::channel::oneshot;
use image::codecs::gif::GifDecoder;
use image::{AnimationDecoder as _, Frame};
use parking::Unparker;
use raw_window_handle::{HasDisplayHandle, HasWindowHandle};
use schemars::JsonSchema;
use seahash::SeaHasher;
@@ -58,7 +56,6 @@ use std::borrow::Cow;
use std::hash::{Hash, Hasher};
use std::io::Cursor;
use std::ops;
use std::time::{Duration, Instant};
use std::{
fmt::{self, Debug},
ops::Range,
@@ -84,7 +81,7 @@ pub(crate) use test::*;
pub(crate) use windows::*;
#[cfg(any(test, feature = "test-support"))]
pub use test::{TestDispatcher, TestScreenCaptureSource, TestScreenCaptureStream};
pub use test::{TestScheduler, TestSchedulerConfig, TestScreenCaptureSource};
/// Returns a background executor for the current platform.
pub fn background_executor() -> BackgroundExecutor {
@@ -556,26 +553,6 @@ pub(crate) trait PlatformWindow: HasWindowHandle + HasDisplayHandle {
}
}
/// This type is public so that our test macro can generate and use it, but it should not
/// be considered part of our public API.
#[doc(hidden)]
pub trait PlatformDispatcher: Send + Sync {
fn is_main_thread(&self) -> bool;
fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>);
fn dispatch_on_main_thread(&self, runnable: Runnable);
fn dispatch_after(&self, duration: Duration, runnable: Runnable);
fn park(&self, timeout: Option<Duration>) -> bool;
fn unparker(&self) -> Unparker;
fn now(&self) -> Instant {
Instant::now()
}
#[cfg(any(test, feature = "test-support"))]
fn as_test(&self) -> Option<&TestDispatcher> {
None
}
}
pub(crate) trait PlatformTextSystem: Send + Sync {
fn add_fonts(&self, fonts: Vec<Cow<'static, [u8]>>) -> Result<()>;
fn all_font_names(&self) -> Vec<String>;

View File

@@ -1,10 +1,10 @@
//! Macos screen have a y axis that goings up from the bottom of the screen and
//! an origin at the bottom left of the main display.
mod dispatcher;
mod display;
mod display_link;
mod events;
mod keyboard;
mod scheduler;
#[cfg(feature = "screen-capture")]
mod screen_capture;
@@ -45,11 +45,11 @@ use std::{
ops::Range,
};
pub(crate) use dispatcher::*;
pub(crate) use display::*;
pub(crate) use display_link::*;
pub(crate) use keyboard::*;
pub(crate) use platform::*;
pub(crate) use scheduler::*;
pub(crate) use window::*;
#[cfg(feature = "font-kit")]

View File

@@ -1,107 +0,0 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
use crate::{PlatformDispatcher, TaskLabel};
use async_task::Runnable;
use objc::{
class, msg_send,
runtime::{BOOL, YES},
sel, sel_impl,
};
use parking::{Parker, Unparker};
use parking_lot::Mutex;
use std::{
ffi::c_void,
ptr::{NonNull, addr_of},
sync::Arc,
time::Duration,
};
/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
/// these pub items from leaking into public API.
pub(crate) mod dispatch_sys {
include!(concat!(env!("OUT_DIR"), "/dispatch_sys.rs"));
}
use dispatch_sys::*;
pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
}
pub(crate) struct MacDispatcher {
parker: Arc<Mutex<Parker>>,
}
impl Default for MacDispatcher {
fn default() -> Self {
Self::new()
}
}
impl MacDispatcher {
pub fn new() -> Self {
MacDispatcher {
parker: Arc::new(Mutex::new(Parker::new())),
}
}
}
impl PlatformDispatcher for MacDispatcher {
fn is_main_thread(&self) -> bool {
let is_main_thread: BOOL = unsafe { msg_send![class!(NSThread), isMainThread] };
is_main_thread == YES
}
fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
unsafe {
dispatch_async_f(
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
runnable.into_raw().as_ptr() as *mut c_void,
Some(trampoline),
);
}
}
fn dispatch_on_main_thread(&self, runnable: Runnable) {
unsafe {
dispatch_async_f(
dispatch_get_main_queue(),
runnable.into_raw().as_ptr() as *mut c_void,
Some(trampoline),
);
}
}
fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
unsafe {
let queue =
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
let when = dispatch_time(DISPATCH_TIME_NOW as u64, duration.as_nanos() as i64);
dispatch_after_f(
when,
queue,
runnable.into_raw().as_ptr() as *mut c_void,
Some(trampoline),
);
}
}
fn park(&self, timeout: Option<Duration>) -> bool {
if let Some(timeout) = timeout {
self.parker.lock().park_timeout(timeout)
} else {
self.parker.lock().park();
true
}
}
fn unparker(&self) -> Unparker {
self.parker.lock().unparker()
}
}
extern "C" fn trampoline(runnable: *mut c_void) {
let task = unsafe { Runnable::<()>::from_raw(NonNull::new_unchecked(runnable as *mut ())) };
task.run();
}

View File

@@ -6,8 +6,8 @@ use super::{
};
use crate::{
Action, AnyWindowHandle, BackgroundExecutor, ClipboardEntry, ClipboardItem, ClipboardString,
CursorStyle, ForegroundExecutor, Image, ImageFormat, KeyContext, Keymap, MacDispatcher,
MacDisplay, MacWindow, Menu, MenuItem, OsMenu, OwnedMenu, PathPromptOptions, Platform,
CursorStyle, ForegroundExecutor, Image, ImageFormat, KeyContext, Keymap, MacDisplay,
MacScheduler, MacWindow, Menu, MenuItem, OsMenu, OwnedMenu, PathPromptOptions, Platform,
PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, PlatformTextSystem,
PlatformWindow, Result, SemanticVersion, SystemMenuType, Task, WindowAppearance, WindowParams,
hash,
@@ -183,7 +183,7 @@ impl Default for MacPlatform {
impl MacPlatform {
pub(crate) fn new(headless: bool) -> Self {
let dispatcher = Arc::new(MacDispatcher::new());
let scheduler = Arc::new(MacScheduler::new());
#[cfg(feature = "font-kit")]
let text_system = Arc::new(crate::MacTextSystem::new());
@@ -197,8 +197,8 @@ impl MacPlatform {
Self(Mutex::new(MacPlatformState {
headless,
text_system,
background_executor: BackgroundExecutor::new(dispatcher.clone()),
foreground_executor: ForegroundExecutor::new(dispatcher),
background_executor: scheduler.background(),
foreground_executor: scheduler.foreground(),
renderer_context: renderer::Context::default(),
pasteboard: unsafe { NSPasteboard::generalPasteboard(nil) },
text_hash_pasteboard_type: unsafe { ns_string("zed-text-hash") },
@@ -504,7 +504,7 @@ impl Platform for MacPlatform {
// this, we make quitting the application asynchronous so that we aren't holding borrows to
// the app state on the stack when we actually terminate the app.
use super::dispatcher::{dispatch_get_main_queue, dispatch_sys::dispatch_async_f};
use super::scheduler::{dispatch_get_main_queue, dispatch_sys::dispatch_async_f};
unsafe {
dispatch_async_f(dispatch_get_main_queue(), ptr::null_mut(), Some(quit));

View File

@@ -0,0 +1,121 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
use crate::{BackgroundExecutor, ForegroundExecutor};
use async_task::Runnable;
use futures::{
channel::oneshot,
future::{self, LocalBoxFuture},
};
use scheduler::{Clock, Scheduler, SessionId, Timer};
use std::{
ffi::c_void,
ptr::{NonNull, addr_of},
sync::{
Arc,
atomic::{AtomicU16, Ordering::SeqCst},
},
time::Duration,
};
/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
/// these pub items from leaking into public API.
pub(crate) mod dispatch_sys {
include!(concat!(env!("OUT_DIR"), "/dispatch_sys.rs"));
}
use dispatch_sys::*;
pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
}
pub(crate) struct MacScheduler {
next_session_id: AtomicU16,
}
impl MacScheduler {
pub fn new() -> Self {
MacScheduler {
next_session_id: AtomicU16::new(0),
}
}
pub fn background(self: &Arc<Self>) -> BackgroundExecutor {
BackgroundExecutor::new(scheduler::BackgroundExecutor::new(self.clone()))
}
pub fn foreground(self: &Arc<Self>) -> ForegroundExecutor {
let session_id = SessionId::new(self.next_session_id.fetch_add(1, SeqCst));
ForegroundExecutor::new(scheduler::ForegroundExecutor::new(session_id, self.clone()))
}
}
impl Scheduler for MacScheduler {
fn block(
&self,
_session_id: Option<SessionId>,
future: LocalBoxFuture<()>,
timeout: Option<Duration>,
) {
if let Some(timeout) = timeout {
let timer = self.timer(timeout);
futures::executor::block_on(future::select(timer, future));
} else {
futures::executor::block_on(future);
}
}
fn schedule_foreground(&self, _session_id: SessionId, runnable: Runnable) {
unsafe {
dispatch_async_f(
dispatch_get_main_queue(),
runnable.into_raw().as_ptr() as *mut c_void,
Some(trampoline),
);
}
}
fn schedule_background(&self, runnable: Runnable) {
unsafe {
dispatch_async_f(
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
runnable.into_raw().as_ptr() as *mut c_void,
Some(trampoline),
);
}
}
fn timer(&self, timeout: Duration) -> Timer {
let (tx, rx) = oneshot::channel();
let (runnable, task) = async_task::spawn(
async move {
tx.send(()).ok();
},
move |runnable: Runnable| unsafe {
let queue =
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
let when = dispatch_time(DISPATCH_TIME_NOW as u64, timeout.as_nanos() as i64);
dispatch_after_f(
when,
queue,
runnable.into_raw().as_ptr() as *mut c_void,
Some(trampoline),
);
},
);
runnable.schedule();
task.detach();
Timer::new(rx)
}
fn clock(&self) -> Arc<dyn Clock> {
todo!()
}
}
extern "C" fn trampoline(runnable: *mut c_void) {
let task = unsafe { Runnable::<()>::from_raw(NonNull::new_unchecked(runnable as *mut ())) };
task.run();
}

View File

@@ -1,11 +1,10 @@
mod dispatcher;
mod display;
mod platform;
mod window;
pub use dispatcher::*;
pub(crate) use display::*;
pub(crate) use platform::*;
pub use scheduler::{TestScheduler, TestSchedulerConfig};
pub(crate) use window::*;
pub use platform::{TestScreenCaptureSource, TestScreenCaptureStream};
pub use platform::TestScreenCaptureSource;

View File

@@ -1,312 +0,0 @@
use crate::{PlatformDispatcher, TaskLabel};
use async_task::Runnable;
use backtrace::Backtrace;
use collections::{HashMap, HashSet, VecDeque};
use parking::{Parker, Unparker};
use parking_lot::Mutex;
use rand::prelude::*;
use std::{
future::Future,
ops::RangeInclusive,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
use util::post_inc;
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
struct TestDispatcherId(usize);
#[doc(hidden)]
pub struct TestDispatcher {
id: TestDispatcherId,
state: Arc<Mutex<TestDispatcherState>>,
parker: Arc<Mutex<Parker>>,
unparker: Unparker,
}
struct TestDispatcherState {
random: StdRng,
foreground: HashMap<TestDispatcherId, VecDeque<Runnable>>,
background: Vec<Runnable>,
deprioritized_background: Vec<Runnable>,
delayed: Vec<(Duration, Runnable)>,
start_time: Instant,
time: Duration,
is_main_thread: bool,
next_id: TestDispatcherId,
allow_parking: bool,
waiting_hint: Option<String>,
waiting_backtrace: Option<Backtrace>,
deprioritized_task_labels: HashSet<TaskLabel>,
block_on_ticks: RangeInclusive<usize>,
}
impl TestDispatcher {
pub fn new(random: StdRng) -> Self {
let (parker, unparker) = parking::pair();
let state = TestDispatcherState {
random,
foreground: HashMap::default(),
background: Vec::new(),
deprioritized_background: Vec::new(),
delayed: Vec::new(),
time: Duration::ZERO,
start_time: Instant::now(),
is_main_thread: true,
next_id: TestDispatcherId(1),
allow_parking: false,
waiting_hint: None,
waiting_backtrace: None,
deprioritized_task_labels: Default::default(),
block_on_ticks: 0..=1000,
};
TestDispatcher {
id: TestDispatcherId(0),
state: Arc::new(Mutex::new(state)),
parker: Arc::new(Mutex::new(parker)),
unparker,
}
}
pub fn advance_clock(&self, by: Duration) {
let new_now = self.state.lock().time + by;
loop {
self.run_until_parked();
let state = self.state.lock();
let next_due_time = state.delayed.first().map(|(time, _)| *time);
drop(state);
if let Some(due_time) = next_due_time
&& due_time <= new_now
{
self.state.lock().time = due_time;
continue;
}
break;
}
self.state.lock().time = new_now;
}
pub fn advance_clock_to_next_delayed(&self) -> bool {
let next_due_time = self.state.lock().delayed.first().map(|(time, _)| *time);
if let Some(next_due_time) = next_due_time {
self.state.lock().time = next_due_time;
return true;
}
false
}
pub fn simulate_random_delay(&self) -> impl 'static + Send + Future<Output = ()> + use<> {
struct YieldNow {
pub(crate) count: usize,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.count > 0 {
self.count -= 1;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
YieldNow {
count: self.state.lock().random.random_range(0..10),
}
}
pub fn tick(&self, background_only: bool) -> bool {
let mut state = self.state.lock();
while let Some((deadline, _)) = state.delayed.first() {
if *deadline > state.time {
break;
}
let (_, runnable) = state.delayed.remove(0);
state.background.push(runnable);
}
let foreground_len: usize = if background_only {
0
} else {
state
.foreground
.values()
.map(|runnables| runnables.len())
.sum()
};
let background_len = state.background.len();
let runnable;
let main_thread;
if foreground_len == 0 && background_len == 0 {
let deprioritized_background_len = state.deprioritized_background.len();
if deprioritized_background_len == 0 {
return false;
}
let ix = state.random.random_range(0..deprioritized_background_len);
main_thread = false;
runnable = state.deprioritized_background.swap_remove(ix);
} else {
main_thread = state.random.random_ratio(
foreground_len as u32,
(foreground_len + background_len) as u32,
);
if main_thread {
let state = &mut *state;
runnable = state
.foreground
.values_mut()
.filter(|runnables| !runnables.is_empty())
.choose(&mut state.random)
.unwrap()
.pop_front()
.unwrap();
} else {
let ix = state.random.random_range(0..background_len);
runnable = state.background.swap_remove(ix);
};
};
let was_main_thread = state.is_main_thread;
state.is_main_thread = main_thread;
drop(state);
runnable.run();
self.state.lock().is_main_thread = was_main_thread;
true
}
pub fn deprioritize(&self, task_label: TaskLabel) {
self.state
.lock()
.deprioritized_task_labels
.insert(task_label);
}
pub fn run_until_parked(&self) {
while self.tick(false) {}
}
pub fn parking_allowed(&self) -> bool {
self.state.lock().allow_parking
}
pub fn allow_parking(&self) {
self.state.lock().allow_parking = true
}
pub fn forbid_parking(&self) {
self.state.lock().allow_parking = false
}
pub fn set_waiting_hint(&self, msg: Option<String>) {
self.state.lock().waiting_hint = msg
}
pub fn waiting_hint(&self) -> Option<String> {
self.state.lock().waiting_hint.clone()
}
pub fn start_waiting(&self) {
self.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
}
pub fn finish_waiting(&self) {
self.state.lock().waiting_backtrace.take();
}
pub fn waiting_backtrace(&self) -> Option<Backtrace> {
self.state.lock().waiting_backtrace.take().map(|mut b| {
b.resolve();
b
})
}
pub fn rng(&self) -> StdRng {
self.state.lock().random.clone()
}
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
self.state.lock().block_on_ticks = range;
}
pub fn gen_block_on_ticks(&self) -> usize {
let mut lock = self.state.lock();
let block_on_ticks = lock.block_on_ticks.clone();
lock.random.random_range(block_on_ticks)
}
}
impl Clone for TestDispatcher {
fn clone(&self) -> Self {
let id = post_inc(&mut self.state.lock().next_id.0);
Self {
id: TestDispatcherId(id),
state: self.state.clone(),
parker: self.parker.clone(),
unparker: self.unparker.clone(),
}
}
}
impl PlatformDispatcher for TestDispatcher {
fn is_main_thread(&self) -> bool {
self.state.lock().is_main_thread
}
fn now(&self) -> Instant {
let state = self.state.lock();
state.start_time + state.time
}
fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
{
let mut state = self.state.lock();
if label.is_some_and(|label| state.deprioritized_task_labels.contains(&label)) {
state.deprioritized_background.push(runnable);
} else {
state.background.push(runnable);
}
}
self.unparker.unpark();
}
fn dispatch_on_main_thread(&self, runnable: Runnable) {
self.state
.lock()
.foreground
.entry(self.id)
.or_default()
.push_back(runnable);
self.unparker.unpark();
}
fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) {
let mut state = self.state.lock();
let next_time = state.time + duration;
let ix = match state.delayed.binary_search_by_key(&next_time, |e| e.0) {
Ok(ix) | Err(ix) => ix,
};
state.delayed.insert(ix, (next_time, runnable));
}
fn park(&self, _: Option<std::time::Duration>) -> bool {
self.parker.lock().park();
true
}
fn unparker(&self) -> Unparker {
self.unparker.clone()
}
fn as_test(&self) -> Option<&TestDispatcher> {
Some(self)
}
}

View File

@@ -133,7 +133,6 @@ impl TestPlatform {
.new_path
.pop_front()
.expect("no pending new path prompt");
self.background_executor().set_waiting_hint(None);
tx.send(Ok(select_path(&path))).ok();
}
@@ -145,7 +144,6 @@ impl TestPlatform {
.multiple_choice
.pop_front()
.expect("no pending multiple choice prompt");
self.background_executor().set_waiting_hint(None);
let Some(ix) = prompt.answers.iter().position(|a| a == response) else {
panic!(
"PROMPT: {}\n{:?}\n{:?}\nCannot respond with {}",
@@ -180,8 +178,6 @@ impl TestPlatform {
) -> oneshot::Receiver<usize> {
let (tx, rx) = oneshot::channel();
let answers: Vec<String> = answers.iter().map(|s| s.label().to_string()).collect();
self.background_executor()
.set_waiting_hint(Some(format!("PROMPT: {:?} {:?}", msg, detail)));
self.prompts
.borrow_mut()
.multiple_choice
@@ -344,8 +340,6 @@ impl Platform for TestPlatform {
_suggested_name: Option<&str>,
) -> oneshot::Receiver<Result<Option<std::path::PathBuf>>> {
let (tx, rx) = oneshot::channel();
self.background_executor()
.set_waiting_hint(Some(format!("PROMPT FOR PATH: {:?}", directory)));
self.prompts
.borrow_mut()
.new_path

View File

@@ -40,6 +40,7 @@ pub(crate) struct WindowsPlatform {
drop_target_helper: IDropTargetHelper,
handle: HWND,
disable_direct_composition: bool,
main_thread_id: std::thread::ThreadId,
}
struct WindowsPlatformInner {
@@ -130,6 +131,7 @@ impl WindowsPlatform {
};
let inner = context.inner.take().unwrap()?;
let handle = result?;
let main_thread_id = std::thread::current().id();
let dispatcher = Arc::new(WindowsDispatcher::new(
main_sender,
handle,
@@ -158,6 +160,7 @@ impl WindowsPlatform {
disable_direct_composition,
windows_version,
drop_target_helper,
main_thread_id,
})
}
@@ -270,6 +273,10 @@ impl WindowsPlatform {
}
impl Platform for WindowsPlatform {
fn is_main_thread(&self) -> bool {
std::thread::current().id() == self.main_thread_id
}
fn background_executor(&self) -> BackgroundExecutor {
self.background_executor.clone()
}

View File

@@ -25,14 +25,15 @@
//! assert!(true)
//! }
//! ```
use crate::{Entity, Subscription, TestAppContext, TestDispatcher};
use crate::{Entity, Subscription, TestAppContext};
use futures::StreamExt as _;
use rand::prelude::*;
use scheduler::{TestScheduler, TestSchedulerConfig};
use smol::channel;
use std::{
env,
panic::{self, RefUnwindSafe},
pin::Pin,
sync::Arc,
};
/// Run the given test function with the configured parameters.
@@ -42,7 +43,7 @@ pub fn run_test(
num_iterations: usize,
explicit_seeds: &[u64],
max_retries: usize,
test_fn: &mut (dyn RefUnwindSafe + Fn(TestDispatcher, u64)),
test_fn: &mut (dyn RefUnwindSafe + Fn(Arc<TestScheduler>, u64)),
on_fail_fn: Option<fn()>,
) {
let (seeds, is_multiple_runs) = calculate_seeds(num_iterations as u64, explicit_seeds);
@@ -54,8 +55,8 @@ pub fn run_test(
eprintln!("seed = {seed}");
}
let result = panic::catch_unwind(|| {
let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(seed));
test_fn(dispatcher, seed);
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(seed)));
test_fn(scheduler, seed);
});
match result {

View File

@@ -317,16 +317,14 @@ impl Boundary {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
Font, FontFeatures, FontStyle, FontWeight, Hsla, TestAppContext, TestDispatcher, font,
};
use crate::{Font, FontFeatures, FontStyle, FontWeight, Hsla, TestAppContext, font};
#[cfg(target_os = "macos")]
use crate::{TextRun, WindowTextSystem, WrapBoundary};
use rand::prelude::*;
use scheduler::{TestScheduler, TestSchedulerConfig};
fn build_wrapper() -> LineWrapper {
let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
let cx = TestAppContext::build(dispatcher, None);
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(0)));
let cx = TestAppContext::build(scheduler, None);
let id = cx.text_system().resolve_font(&font(".ZedMono"));
LineWrapper::new(id, px(16.), cx.text_system().platform_text_system.clone())
}

View File

@@ -1,4 +1,4 @@
use crate::{BackgroundExecutor, Task};
use crate::BackgroundExecutor;
use std::{
future::Future,
pin::Pin,
@@ -7,6 +7,7 @@ use std::{
time::Duration,
};
use scheduler::Timer;
pub use util::*;
/// A helper trait for building complex objects with imperative conditionals in a fluent style.
@@ -85,7 +86,7 @@ impl<T: Future> FutureExt for T {
pub struct WithTimeout<T> {
future: T,
timer: Task<()>,
timer: Timer,
}
#[derive(Debug, thiserror::Error)]

View File

@@ -146,7 +146,7 @@ fn generate_test_function(
}
Some("BackgroundExecutor") => {
inner_fn_args.extend(quote!(gpui::BackgroundExecutor::new(
std::sync::Arc::new(dispatcher.clone()),
scheduler.background()
),));
continue;
}
@@ -162,15 +162,14 @@ fn generate_test_function(
let cx_varname = format_ident!("cx_{}", ix);
cx_vars.extend(quote!(
let mut #cx_varname = gpui::TestAppContext::build(
dispatcher.clone(),
scheduler.clone(),
Some(stringify!(#outer_fn_name)),
);
));
cx_teardowns.extend(quote!(
dispatcher.run_until_parked();
#cx_varname.executor().forbid_parking();
scheduler.run();
#cx_varname.quit();
dispatcher.run_until_parked();
scheduler.run();
));
inner_fn_args.extend(quote!(&mut #cx_varname,));
continue;
@@ -190,10 +189,9 @@ fn generate_test_function(
#num_iterations,
&[#seeds],
#max_retries,
&mut |dispatcher, _seed| {
let executor = gpui::BackgroundExecutor::new(std::sync::Arc::new(dispatcher.clone()));
&mut |scheduler, _seed| {
#cx_vars
executor.block_test(#inner_fn_name(#inner_fn_args));
scheduler.foreground().block_on(#inner_fn_name(#inner_fn_args));
#cx_teardowns
},
#on_failure_fn_name
@@ -225,33 +223,33 @@ fn generate_test_function(
let cx_varname_lock = format_ident!("cx_{}_lock", ix);
cx_vars.extend(quote!(
let mut #cx_varname = gpui::TestAppContext::build(
dispatcher.clone(),
scheduler.clone(),
Some(stringify!(#outer_fn_name))
);
let mut #cx_varname_lock = #cx_varname.app.borrow_mut();
));
inner_fn_args.extend(quote!(&mut #cx_varname_lock,));
cx_teardowns.extend(quote!(
drop(#cx_varname_lock);
dispatcher.run_until_parked();
#cx_varname.update(|cx| { cx.background_executor().forbid_parking(); cx.quit(); });
dispatcher.run_until_parked();
));
drop(#cx_varname_lock);
scheduler.run();
#cx_varname.update(|cx| { scheduler.forbid_parking(); cx.quit(); });
scheduler.run();
));
continue;
}
Some("TestAppContext") => {
let cx_varname = format_ident!("cx_{}", ix);
cx_vars.extend(quote!(
let mut #cx_varname = gpui::TestAppContext::build(
dispatcher.clone(),
scheduler.clone(),
Some(stringify!(#outer_fn_name))
);
));
cx_teardowns.extend(quote!(
dispatcher.run_until_parked();
#cx_varname.executor().forbid_parking();
scheduler.run();
scheduler.forbid_parking();
#cx_varname.quit();
dispatcher.run_until_parked();
scheduler.run();
));
inner_fn_args.extend(quote!(&mut #cx_varname,));
continue;
@@ -273,7 +271,7 @@ fn generate_test_function(
#num_iterations,
&[#seeds],
#max_retries,
&mut |dispatcher, _seed| {
&mut |scheduler, _seed| {
#cx_vars
#inner_fn_name(#inner_fn_args);
#cx_teardowns

View File

@@ -25,7 +25,7 @@ use fs::MTime;
use futures::channel::oneshot;
use gpui::{
App, AppContext as _, Context, Entity, EventEmitter, HighlightStyle, SharedString, StyledText,
Task, TaskLabel, TextStyle,
Task, TextStyle,
};
use lsp::{LanguageServerId, NumberOrString};
use parking_lot::Mutex;
@@ -49,7 +49,7 @@ use std::{
ops::{Deref, Range},
path::{Path, PathBuf},
rc,
sync::{Arc, LazyLock},
sync::Arc,
time::{Duration, Instant},
vec,
};
@@ -72,10 +72,6 @@ pub use {tree_sitter_python, tree_sitter_rust, tree_sitter_typescript};
pub use lsp::DiagnosticSeverity;
/// A label for the background task spawned by the buffer to compute
/// a diff against the contents of its file.
pub static BUFFER_DIFF_TASK: LazyLock<TaskLabel> = LazyLock::new(TaskLabel::new);
/// Indicate whether a [`Buffer`] has permissions to edit.
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum Capability {
@@ -1519,7 +1515,7 @@ impl Buffer {
self.parse_status.0.send(ParseStatus::Parsing).unwrap();
match cx
.background_executor()
.foreground_executor()
.block_with_timeout(self.sync_parse_timeout, parse_task)
{
Ok(new_syntax_snapshot) => {
@@ -1607,7 +1603,7 @@ impl Buffer {
if let Some(indent_sizes) = self.compute_autoindents() {
let indent_sizes = cx.background_spawn(indent_sizes);
match cx
.background_executor()
.foreground_executor()
.block_with_timeout(Duration::from_micros(500), indent_sizes)
{
Ok(indent_sizes) => self.apply_autoindents(indent_sizes, cx),
@@ -1867,18 +1863,17 @@ impl Buffer {
pub fn diff(&self, mut new_text: String, cx: &App) -> Task<Diff> {
let old_text = self.as_rope().clone();
let base_version = self.version();
cx.background_executor()
.spawn_labeled(*BUFFER_DIFF_TASK, async move {
let old_text = old_text.to_string();
let line_ending = LineEnding::detect(&new_text);
LineEnding::normalize(&mut new_text);
let edits = text_diff(&old_text, &new_text);
Diff {
base_version,
line_ending,
edits,
}
})
cx.background_executor().spawn(async move {
let old_text = old_text.to_string();
let line_ending = LineEnding::detect(&new_text);
LineEnding::normalize(&mut new_text);
let edits = text_diff(&old_text, &new_text);
Diff {
base_version,
line_ending,
edits,
}
})
}
/// Spawns a background task that searches the buffer for any whitespace

View File

@@ -2765,8 +2765,8 @@ fn test_serialization(cx: &mut gpui::App) {
let state = buffer1.read(cx).to_proto(cx);
let ops = cx
.background_executor()
.block(buffer1.read(cx).serialize_ops(None, cx));
.foreground_executor()
.block_on(buffer1.read(cx).serialize_ops(None, cx));
let buffer2 = cx.new(|cx| {
let mut buffer = Buffer::from_proto(1, Capability::ReadWrite, state, None).unwrap();
buffer.apply_ops(
@@ -3098,8 +3098,8 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
let buffer = cx.new(|cx| {
let state = base_buffer.read(cx).to_proto(cx);
let ops = cx
.background_executor()
.block(base_buffer.read(cx).serialize_ops(None, cx));
.foreground_executor()
.block_on(base_buffer.read(cx).serialize_ops(None, cx));
let mut buffer =
Buffer::from_proto(i as ReplicaId, Capability::ReadWrite, state, None).unwrap();
buffer.apply_ops(
@@ -3208,8 +3208,8 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
50..=59 if replica_ids.len() < max_peers => {
let old_buffer_state = buffer.read(cx).to_proto(cx);
let old_buffer_ops = cx
.background_executor()
.block(buffer.read(cx).serialize_ops(None, cx));
.foreground_executor()
.block_on(buffer.read(cx).serialize_ops(None, cx));
let new_replica_id = (0..=replica_ids.len() as ReplicaId)
.filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
.choose(&mut rng)

View File

@@ -988,8 +988,8 @@ mod tests {
// Validate that all models are supported by tiktoken-rs
for model in Model::iter() {
let count = cx
.executor()
.block(count_open_ai_tokens(
.foreground_executor()
.block_on(count_open_ai_tokens(
request.clone(),
model,
&cx.app.borrow(),

View File

@@ -1703,13 +1703,11 @@ impl FakeLanguageServer {
T: request::Request,
T::Result: 'static + Send,
{
self.server.executor.start_waiting();
self.server.request::<T>(params).await
}
/// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
self.server.executor.start_waiting();
self.try_receive_notification::<T>().await.unwrap()
}

View File

@@ -74,8 +74,8 @@ fn test_remote(cx: &mut App) {
let guest_buffer = cx.new(|cx| {
let state = host_buffer.read(cx).to_proto(cx);
let ops = cx
.background_executor()
.block(host_buffer.read(cx).serialize_ops(None, cx));
.foreground_executor()
.block_on(host_buffer.read(cx).serialize_ops(None, cx));
let mut buffer = Buffer::from_proto(1, Capability::ReadWrite, state, None).unwrap();
buffer.apply_ops(
ops.into_iter()

View File

@@ -65,7 +65,7 @@ pub async fn load_direnv_environment(
let output = String::from_utf8_lossy(&direnv_output.stdout);
if output.is_empty() {
// direnv outputs nothing when it has no changes to apply to environment variables
return Ok(HashMap::new());
return Ok(HashMap::default());
}
match serde_json::from_str(&output) {

View File

@@ -1148,7 +1148,7 @@ impl SettingsObserver {
) -> Task<()> {
let mut user_tasks_file_rx =
watch_config_file(cx.background_executor(), fs, file_path.clone());
let user_tasks_content = cx.background_executor().block(user_tasks_file_rx.next());
let user_tasks_content = cx.foreground_executor().block_on(user_tasks_file_rx.next());
let weak_entry = cx.weak_entity();
cx.spawn(async move |settings_observer, cx| {
let Ok(task_store) = settings_observer.read_with(cx, |settings_observer, _| {
@@ -1203,7 +1203,7 @@ impl SettingsObserver {
) -> Task<()> {
let mut user_tasks_file_rx =
watch_config_file(cx.background_executor(), fs, file_path.clone());
let user_tasks_content = cx.background_executor().block(user_tasks_file_rx.next());
let user_tasks_content = cx.foreground_executor().block_on(user_tasks_file_rx.next());
let weak_entry = cx.weak_entity();
cx.spawn(async move |settings_observer, cx| {
let Ok(task_store) = settings_observer.read_with(cx, |settings_observer, _| {

View File

@@ -6,8 +6,7 @@ use crate::{
};
use async_trait::async_trait;
use buffer_diff::{
BufferDiffEvent, CALCULATE_DIFF_TASK, DiffHunkSecondaryStatus, DiffHunkStatus,
DiffHunkStatusKind, assert_hunks,
BufferDiffEvent, DiffHunkSecondaryStatus, DiffHunkStatus, DiffHunkStatusKind, assert_hunks,
};
use fs::FakeFs;
use futures::{StreamExt, future};
@@ -193,8 +192,8 @@ async fn test_editorconfig_support(cx: &mut gpui::TestAppContext) {
.languages()
.language_for_file_path(file.path.as_ref());
let file_language = cx
.background_executor()
.block(file_language)
.foreground_executor()
.block_on(file_language)
.expect("Failed to get file language");
let file = file as _;
language_settings(Some(file_language.name()), Some(&file), cx).into_owned()
@@ -3922,7 +3921,7 @@ async fn test_save_file_spawns_language_server(cx: &mut gpui::TestAppContext) {
});
}
#[gpui::test(iterations = 30)]
#[gpui::test(iterations = 100)]
async fn test_file_changes_multiple_times_on_disk(cx: &mut gpui::TestAppContext) {
init_test(cx);
@@ -3942,10 +3941,6 @@ async fn test_file_changes_multiple_times_on_disk(cx: &mut gpui::TestAppContext)
.await
.unwrap();
// Simulate buffer diffs being slow, so that they don't complete before
// the next file change occurs.
cx.executor().deprioritize(*language::BUFFER_DIFF_TASK);
// Change the buffer's file on disk, and then wait for the file change
// to be detected by the worktree, so that the buffer starts reloading.
fs.save(
@@ -3977,7 +3972,7 @@ async fn test_file_changes_multiple_times_on_disk(cx: &mut gpui::TestAppContext)
});
}
#[gpui::test(iterations = 30)]
#[gpui::test(iterations = 100)]
async fn test_edit_buffer_while_it_reloads(cx: &mut gpui::TestAppContext) {
init_test(cx);
@@ -3997,10 +3992,6 @@ async fn test_edit_buffer_while_it_reloads(cx: &mut gpui::TestAppContext) {
.await
.unwrap();
// Simulate buffer diffs being slow, so that they don't complete before
// the next file change occurs.
cx.executor().deprioritize(*language::BUFFER_DIFF_TASK);
// Change the buffer's file on disk, and then wait for the file change
// to be detected by the worktree, so that the buffer starts reloading.
fs.save(
@@ -7655,21 +7646,12 @@ async fn test_staging_hunks_with_delayed_fs_event(cx: &mut gpui::TestAppContext)
});
}
#[gpui::test(iterations = 25)]
async fn test_staging_random_hunks(
mut rng: StdRng,
executor: BackgroundExecutor,
cx: &mut gpui::TestAppContext,
) {
#[gpui::test(iterations = 100)]
async fn test_staging_random_hunks(mut rng: StdRng, cx: &mut gpui::TestAppContext) {
let operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(20);
// Try to induce races between diff recalculation and index writes.
if rng.random_bool(0.5) {
executor.deprioritize(*CALCULATE_DIFF_TASK);
}
use DiffHunkSecondaryStatus::*;
init_test(cx);

View File

@@ -63,7 +63,7 @@ impl ProjectSymbolsDelegate {
fn filter(&mut self, query: &str, window: &mut Window, cx: &mut Context<Picker<Self>>) {
const MAX_MATCHES: usize = 100;
let mut visible_matches = cx.background_executor().block(fuzzy::match_strings(
let mut visible_matches = cx.foreground_executor().block_on(fuzzy::match_strings(
&self.visible_match_candidates,
query,
false,
@@ -72,7 +72,7 @@ impl ProjectSymbolsDelegate {
&Default::default(),
cx.background_executor().clone(),
));
let mut external_matches = cx.background_executor().block(fuzzy::match_strings(
let mut external_matches = cx.foreground_executor().block_on(fuzzy::match_strings(
&self.external_match_candidates,
query,
false,

View File

@@ -957,8 +957,8 @@ pub fn handle_settings_file_changes(
settings_changed: impl Fn(Option<anyhow::Error>, &mut App) + 'static,
) {
let server_settings_content = cx
.background_executor()
.block(server_settings_file.next())
.foreground_executor()
.block_on(server_settings_file.next())
.unwrap();
SettingsStore::update_global(cx, |store, cx| {
store

View File

@@ -16,6 +16,7 @@ doctest = false
alacritty_terminal.workspace = true
anyhow.workspace = true
async-dispatcher.workspace = true
async-task.workspace = true
async-tungstenite = { workspace = true, features = ["tokio", "tokio-rustls-manual-roots"] }
base64.workspace = true
client.workspace = true

View File

@@ -11,7 +11,7 @@ mod session;
use std::{sync::Arc, time::Duration};
use async_dispatcher::{Dispatcher, Runnable, set_dispatcher};
use gpui::{App, PlatformDispatcher};
use gpui::{App, Scheduler};
use project::Fs;
pub use runtimelib::ExecutionState;
use settings::Settings as _;
@@ -37,7 +37,7 @@ pub fn init(fs: Arc<dyn Fs>, cx: &mut App) {
fn zed_dispatcher(cx: &mut App) -> impl Dispatcher {
struct ZedDispatcher {
dispatcher: Arc<dyn PlatformDispatcher>,
scheduler: Arc<dyn Scheduler>,
}
// PlatformDispatcher is _super_ close to the same interface we put in
@@ -46,15 +46,25 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher {
// other crates in Zed.
impl Dispatcher for ZedDispatcher {
fn dispatch(&self, runnable: Runnable) {
self.dispatcher.dispatch(runnable, None)
self.scheduler.schedule_background(runnable)
}
fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
self.dispatcher.dispatch_after(duration, runnable);
let timer = self.scheduler.timer(duration);
let future = async move {
timer.await;
runnable.run();
};
let (runnable, task) = async_task::spawn(future, {
let scheduler = self.scheduler.clone();
move |runnable| scheduler.schedule_background(runnable)
});
runnable.schedule();
task.detach();
}
}
ZedDispatcher {
dispatcher: cx.background_executor().dispatcher.clone(),
scheduler: cx.background_executor().scheduler().clone(),
}
}

View File

@@ -17,9 +17,9 @@ test-support = []
[dependencies]
async-task.workspace = true
backtrace.workspace = true
chrono.workspace = true
futures.workspace = true
parking.workspace = true
parking_lot.workspace = true
rand.workspace = true
workspace-hack.workspace = true

View File

@@ -1,34 +1,42 @@
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use std::time::{Duration, Instant};
pub trait Clock {
fn now(&self) -> DateTime<Utc>;
fn utc_now(&self) -> DateTime<Utc>;
fn now(&self) -> Instant;
}
pub struct TestClock {
now: Mutex<DateTime<Utc>>,
pub struct TestClock(Mutex<TestClockState>);
struct TestClockState {
now: Instant,
utc_now: DateTime<Utc>,
}
impl TestClock {
pub fn new() -> Self {
const START_TIME: &str = "2025-07-01T23:59:58-00:00";
let now = DateTime::parse_from_rfc3339(START_TIME).unwrap().to_utc();
Self {
now: Mutex::new(now),
}
}
pub fn set_now(&self, now: DateTime<Utc>) {
*self.now.lock() = now;
let utc_now = DateTime::parse_from_rfc3339(START_TIME).unwrap().to_utc();
Self(Mutex::new(TestClockState {
now: Instant::now(),
utc_now,
}))
}
pub fn advance(&self, duration: Duration) {
*self.now.lock() += duration;
let mut state = self.0.lock();
state.now += duration;
state.utc_now += duration;
}
}
impl Clock for TestClock {
fn now(&self) -> DateTime<Utc> {
*self.now.lock()
fn utc_now(&self) -> DateTime<Utc> {
self.0.lock().utc_now
}
fn now(&self) -> Instant {
self.0.lock().now
}
}

View File

@@ -1,11 +1,15 @@
use crate::{Scheduler, SessionId, Timer};
use futures::FutureExt as _;
use std::{
future::Future,
marker::PhantomData,
mem::ManuallyDrop,
panic::Location,
pin::Pin,
rc::Rc,
sync::Arc,
task::{Context, Poll},
thread::{self, ThreadId},
time::Duration,
};
@@ -17,6 +21,15 @@ pub struct ForegroundExecutor {
}
impl ForegroundExecutor {
pub fn new(session_id: SessionId, scheduler: Arc<dyn Scheduler>) -> Self {
Self {
session_id,
scheduler,
not_send: PhantomData,
}
}
#[track_caller]
pub fn spawn<F>(&self, future: F) -> Task<F::Output>
where
F: Future + 'static,
@@ -24,43 +37,52 @@ impl ForegroundExecutor {
{
let session_id = self.session_id;
let scheduler = Arc::clone(&self.scheduler);
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
let (runnable, task) = spawn_local_with_source_location(future, move |runnable| {
scheduler.schedule_foreground(session_id, runnable);
});
runnable.schedule();
Task(TaskState::Spawned(task))
}
pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
let mut output = None;
self.scheduler.block(
Some(self.session_id),
async { output = Some(future.await) }.boxed_local(),
None,
);
output.unwrap()
}
pub fn block_with_timeout<Fut: Unpin + Future>(
&self,
timeout: Duration,
mut future: Fut,
) -> Result<Fut::Output, Fut> {
let mut output = None;
self.scheduler.block(
Some(self.session_id),
async { output = Some((&mut future).await) }.boxed_local(),
Some(timeout),
);
output.ok_or(future)
}
pub fn timer(&self, duration: Duration) -> Timer {
self.scheduler.timer(duration)
}
}
impl ForegroundExecutor {
pub fn new(session_id: SessionId, scheduler: Arc<dyn Scheduler>) -> Self {
assert!(
scheduler.is_main_thread(),
"ForegroundExecutor must be created on the same thread as the Scheduler"
);
Self {
session_id,
scheduler,
not_send: PhantomData,
}
}
#[derive(Clone)]
pub struct BackgroundExecutor {
scheduler: Arc<dyn Scheduler>,
}
impl BackgroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
}
pub struct BackgroundExecutor {
scheduler: Arc<dyn Scheduler>,
}
impl BackgroundExecutor {
pub fn spawn<F>(&self, future: F) -> Task<F::Output>
where
F: Future + Send + 'static,
@@ -74,21 +96,13 @@ impl BackgroundExecutor {
Task(TaskState::Spawned(task))
}
pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
self.scheduler.block_on(future)
}
pub fn block_with_timeout<Fut: Unpin + Future>(
&self,
future: &mut Fut,
timeout: Duration,
) -> Option<Fut::Output> {
self.scheduler.block_with_timeout(future, timeout)
}
pub fn timer(&self, duration: Duration) -> Timer {
self.scheduler.timer(duration)
}
pub fn scheduler(&self) -> &Arc<dyn Scheduler> {
&self.scheduler
}
}
/// Task is a primitive that allows work to happen in the background.
@@ -116,6 +130,13 @@ impl<T> Task<T> {
Task(TaskState::Ready(Some(val)))
}
pub fn is_ready(&self) -> bool {
match &self.0 {
TaskState::Ready(_) => true,
TaskState::Spawned(task) => task.is_finished(),
}
}
/// Detaching a task runs it to completion in the background
pub fn detach(self) {
match self {
@@ -135,3 +156,68 @@ impl<T> Future for Task<T> {
}
}
}
/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
///
/// Copy-modified from:
/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
#[track_caller]
fn spawn_local_with_source_location<Fut, S>(
future: Fut,
schedule: S,
) -> (async_task::Runnable, async_task::Task<Fut::Output, ()>)
where
Fut: Future + 'static,
Fut::Output: 'static,
S: async_task::Schedule + Send + Sync + 'static,
{
#[inline]
fn thread_id() -> ThreadId {
std::thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
struct Checked<F> {
id: ThreadId,
inner: ManuallyDrop<F>,
location: &'static Location<'static>,
}
impl<F> Drop for Checked<F> {
fn drop(&mut self) {
assert!(
self.id == thread_id(),
"local task dropped by a thread that didn't spawn it. Task spawned at {}",
self.location
);
unsafe {
ManuallyDrop::drop(&mut self.inner);
}
}
}
impl<F: Future> Future for Checked<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(
self.id == thread_id(),
"local task polled by a thread that didn't spawn it. Task spawned at {}",
self.location
);
unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
}
}
// Wrap the future into one that checks which thread it's on.
let future = Checked {
id: thread_id(),
inner: ManuallyDrop::new(future),
location: Location::caller(),
};
unsafe { async_task::spawn_unchecked(future, schedule) }
}

View File

@@ -13,44 +13,45 @@ use futures::{FutureExt as _, channel::oneshot, future::LocalBoxFuture};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
pub trait Scheduler: Send + Sync {
fn block(&self, future: LocalBoxFuture<()>, timeout: Option<Duration>);
// todo!("consider not taking sessions")
fn block(
&self,
session_id: Option<SessionId>,
future: LocalBoxFuture<()>,
timeout: Option<Duration>,
);
fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable);
fn schedule_background(&self, runnable: Runnable);
fn timer(&self, timeout: Duration) -> Timer;
fn is_main_thread(&self) -> bool;
}
impl dyn Scheduler {
pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
let mut output = None;
self.block(async { output = Some(future.await) }.boxed_local(), None);
output.unwrap()
}
pub fn block_with_timeout<Fut: Unpin + Future>(
&self,
future: &mut Fut,
timeout: Duration,
) -> Option<Fut::Output> {
let mut output = None;
self.block(
async { output = Some(future.await) }.boxed_local(),
Some(timeout),
);
output
fn clock(&self) -> Arc<dyn Clock>;
fn as_test(&self) -> &TestScheduler {
panic!("this is not a test scheduler")
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct SessionId(u16);
impl SessionId {
pub fn new(id: u16) -> Self {
SessionId(id)
}
}
pub struct Timer(oneshot::Receiver<()>);
impl Timer {
pub fn new(rx: oneshot::Receiver<()>) -> Self {
Timer(rx)
}
}
impl Future for Timer {
type Output = ();

View File

@@ -1,31 +1,37 @@
use crate::{
BackgroundExecutor, Clock as _, ForegroundExecutor, Scheduler, SessionId, TestClock, Timer,
BackgroundExecutor, Clock, ForegroundExecutor, Scheduler, SessionId, TestClock, Timer,
};
use async_task::Runnable;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use backtrace::{Backtrace, BacktraceFrame};
use futures::{FutureExt as _, channel::oneshot, future::LocalBoxFuture};
use parking_lot::Mutex;
use rand::prelude::*;
use std::{
collections::VecDeque,
any::type_name_of_val,
collections::{BTreeMap, VecDeque},
env,
fmt::Write,
future::Future,
mem,
ops::RangeInclusive,
panic::{self, AssertUnwindSafe},
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering::SeqCst},
},
task::{Context, Poll, Wake, Waker},
thread,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread::{self, Thread},
time::{Duration, Instant},
};
const PENDING_TRACES_VAR_NAME: &str = "PENDING_TRACES";
pub struct TestScheduler {
clock: Arc<TestClock>,
rng: Arc<Mutex<StdRng>>,
state: Mutex<SchedulerState>,
pub thread_id: thread::ThreadId,
pub config: SchedulerConfig,
state: Arc<Mutex<SchedulerState>>,
thread: Thread,
}
impl TestScheduler {
@@ -52,26 +58,30 @@ impl TestScheduler {
/// Run a test once with a specific seed
pub fn with_seed<R>(seed: u64, f: impl AsyncFnOnce(Arc<TestScheduler>) -> R) -> R {
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::with_seed(seed)));
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(seed)));
let future = f(scheduler.clone());
let result = scheduler.block_on(future);
scheduler.run();
let result = scheduler.foreground().block_on(future);
scheduler.run(); // Ensure spawned tasks finish up before returning in tests
result
}
pub fn new(config: SchedulerConfig) -> Self {
pub fn new(config: TestSchedulerConfig) -> Self {
Self {
rng: Arc::new(Mutex::new(StdRng::seed_from_u64(config.seed))),
state: Mutex::new(SchedulerState {
state: Arc::new(Mutex::new(SchedulerState {
runnables: VecDeque::new(),
timers: Vec::new(),
blocked_sessions: Vec::new(),
randomize_order: config.randomize_order,
allow_parking: config.allow_parking,
timeout_ticks: config.timeout_ticks,
next_session_id: SessionId(0),
}),
thread_id: thread::current().id(),
capture_pending_traces: config.capture_pending_traces,
pending_traces: BTreeMap::new(),
next_trace_id: TraceId(0),
})),
clock: Arc::new(TestClock::new()),
config,
thread: thread::current(),
}
}
@@ -83,6 +93,18 @@ impl TestScheduler {
self.rng.clone()
}
pub fn set_timeout_ticks(&self, timeout_ticks: RangeInclusive<usize>) {
self.state.lock().timeout_ticks = timeout_ticks;
}
pub fn allow_parking(&self) {
self.state.lock().allow_parking = true;
}
pub fn forbid_parking(&self) {
self.state.lock().allow_parking = false;
}
/// Create a foreground executor for this scheduler
pub fn foreground(self: &Arc<Self>) -> ForegroundExecutor {
let session_id = {
@@ -98,16 +120,17 @@ impl TestScheduler {
BackgroundExecutor::new(self.clone())
}
pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
(self as &dyn Scheduler).block_on(future)
}
pub fn yield_random(&self) -> Yield {
Yield(self.rng.lock().random_range(0..20))
let rng = &mut *self.rng.lock();
if rng.random_bool(0.1) {
Yield(rng.random_range(10..20))
} else {
Yield(rng.random_range(0..2))
}
}
pub fn run(&self) {
while self.step() || self.advance_clock() {
while self.step() {
// Continue until no work remains
}
}
@@ -125,7 +148,16 @@ impl TestScheduler {
return true;
}
let runnable = self.state.lock().runnables.pop_front();
let runnable = {
let state = &mut *self.state.lock();
let ix = state.runnables.iter().position(|runnable| {
runnable
.session_id
.is_none_or(|session_id| !state.blocked_sessions.contains(&session_id))
});
ix.and_then(|ix| state.runnables.remove(ix))
};
if let Some(runnable) = runnable {
runnable.run();
return true;
@@ -134,19 +166,120 @@ impl TestScheduler {
false
}
fn advance_clock(&self) -> bool {
fn advance_clock_to_next_timer(&self) -> bool {
if let Some(timer) = self.state.lock().timers.first() {
self.clock.set_now(timer.expiration);
self.clock.advance(timer.expiration - self.clock.now());
true
} else {
false
}
}
pub fn advance_clock(&self, duration: Duration) {
let next_now = self.clock.now() + duration;
loop {
self.run();
if let Some(timer) = self.state.lock().timers.first()
&& timer.expiration <= next_now
{
self.clock.advance(timer.expiration - self.clock.now());
} else {
break;
}
}
self.clock.advance(next_now - self.clock.now());
}
fn park(&self, deadline: Option<Instant>) -> bool {
if self.state.lock().allow_parking {
if let Some(deadline) = deadline {
let now = Instant::now();
let timeout = deadline.saturating_duration_since(now);
thread::park_timeout(timeout);
now.elapsed() < timeout
} else {
thread::park();
true
}
} else if deadline.is_some() {
false
} else if self.state.lock().capture_pending_traces {
let mut pending_traces = String::new();
for (_, trace) in mem::take(&mut self.state.lock().pending_traces) {
writeln!(pending_traces, "{:?}", exclude_wakers_from_trace(trace)).unwrap();
}
panic!("Parking forbidden. Pending traces:\n{}", pending_traces);
} else {
panic!(
"Parking forbidden. Re-run with {PENDING_TRACES_VAR_NAME}=1 to show pending traces"
);
}
}
}
impl Scheduler for TestScheduler {
fn is_main_thread(&self) -> bool {
thread::current().id() == self.thread_id
/// Block until the given future completes, with an optional timeout. If the
/// future is unable to make progress at any moment before the timeout and
/// no other tasks or timers remain, we panic unless parking is allowed. If
/// parking is allowed, we block up to the timeout or indefinitely if none
/// is provided. This is to allow testing a mix of deterministic and
/// non-deterministic async behavior, such as when interacting with I/O in
/// an otherwise deterministic test.
fn block(
&self,
session_id: Option<SessionId>,
mut future: LocalBoxFuture<()>,
timeout: Option<Duration>,
) {
if let Some(session_id) = session_id {
self.state.lock().blocked_sessions.push(session_id);
}
let deadline = timeout.map(|timeout| Instant::now() + timeout);
let awoken = Arc::new(AtomicBool::new(false));
let waker = Box::new(TracingWaker {
id: None,
awoken: awoken.clone(),
thread: self.thread.clone(),
state: self.state.clone(),
});
let waker = unsafe { Waker::new(Box::into_raw(waker) as *const (), &WAKER_VTABLE) };
let max_ticks = if timeout.is_some() {
self.rng
.lock()
.random_range(self.state.lock().timeout_ticks.clone())
} else {
usize::MAX
};
let mut cx = Context::from_waker(&waker);
for _ in 0..max_ticks {
let Poll::Pending = future.poll_unpin(&mut cx) else {
break;
};
let mut stepped = None;
while self.rng.lock().random() {
let stepped = stepped.get_or_insert(false);
if self.step() {
*stepped = true;
} else {
break;
}
}
let stepped = stepped.unwrap_or(true);
let awoken = awoken.swap(false, SeqCst);
if !stepped && !awoken && !self.advance_clock_to_next_timer() {
if !self.park(deadline) {
break;
}
}
}
if session_id.is_some() {
self.state.lock().blocked_sessions.pop();
}
}
fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable) {
@@ -170,6 +303,8 @@ impl Scheduler for TestScheduler {
runnable,
},
);
drop(state);
self.thread.unpark();
}
fn schedule_background(&self, runnable: Runnable) {
@@ -186,83 +321,40 @@ impl Scheduler for TestScheduler {
runnable,
},
);
drop(state);
self.thread.unpark();
}
fn timer(&self, duration: Duration) -> Timer {
let (tx, rx) = oneshot::channel();
let expiration = self.clock.now() + ChronoDuration::from_std(duration).unwrap();
let state = &mut *self.state.lock();
state.timers.push(ScheduledTimer {
expiration,
expiration: self.clock.now() + duration,
_notify: tx,
});
state.timers.sort_by_key(|timer| timer.expiration);
Timer(rx)
}
/// Block until the given future completes, with an optional timeout. If the
/// future is unable to make progress at any moment before the timeout and
/// no other tasks or timers remain, we panic unless parking is allowed. If
/// parking is allowed, we block up to the timeout or indefinitely if none
/// is provided. This is to allow testing a mix of deterministic and
/// non-deterministic async behavior, such as when interacting with I/O in
/// an otherwise deterministic test.
fn block(&self, mut future: LocalBoxFuture<()>, timeout: Option<Duration>) {
let (parker, unparker) = parking::pair();
let deadline = timeout.map(|timeout| Instant::now() + timeout);
let awoken = Arc::new(AtomicBool::new(false));
let waker = Waker::from(Arc::new(WakerFn::new({
let awoken = awoken.clone();
move || {
awoken.store(true, SeqCst);
unparker.unpark();
}
})));
let max_ticks = if timeout.is_some() {
self.rng
.lock()
.random_range(0..=self.config.max_timeout_ticks)
} else {
usize::MAX
};
let mut cx = Context::from_waker(&waker);
fn clock(&self) -> Arc<dyn Clock> {
self.clock.clone()
}
for _ in 0..max_ticks {
let Poll::Pending = future.poll_unpin(&mut cx) else {
break;
};
let mut stepped = None;
while self.rng.lock().random() && stepped.unwrap_or(true) {
*stepped.get_or_insert(false) |= self.step();
}
let stepped = stepped.unwrap_or(true);
let awoken = awoken.swap(false, SeqCst);
if !stepped && !awoken && !self.advance_clock() {
if self.state.lock().allow_parking {
if !park(&parker, deadline) {
break;
}
} else if deadline.is_some() {
break;
} else {
panic!("Parking forbidden");
}
}
}
fn as_test(&self) -> &TestScheduler {
self
}
}
#[derive(Clone, Debug)]
pub struct SchedulerConfig {
pub struct TestSchedulerConfig {
pub seed: u64,
pub randomize_order: bool,
pub allow_parking: bool,
pub max_timeout_ticks: usize,
pub capture_pending_traces: bool,
pub timeout_ticks: RangeInclusive<usize>,
}
impl SchedulerConfig {
impl TestSchedulerConfig {
pub fn with_seed(seed: u64) -> Self {
Self {
seed,
@@ -271,13 +363,15 @@ impl SchedulerConfig {
}
}
impl Default for SchedulerConfig {
impl Default for TestSchedulerConfig {
fn default() -> Self {
Self {
seed: 0,
randomize_order: true,
allow_parking: false,
max_timeout_ticks: 1000,
capture_pending_traces: env::var(PENDING_TRACES_VAR_NAME)
.map_or(false, |var| var == "1" || var == "true"),
timeout_ticks: 0..=1000,
}
}
}
@@ -294,35 +388,104 @@ impl ScheduledRunnable {
}
struct ScheduledTimer {
expiration: DateTime<Utc>,
expiration: Instant,
_notify: oneshot::Sender<()>,
}
struct SchedulerState {
runnables: VecDeque<ScheduledRunnable>,
timers: Vec<ScheduledTimer>,
blocked_sessions: Vec<SessionId>,
randomize_order: bool,
allow_parking: bool,
timeout_ticks: RangeInclusive<usize>,
next_session_id: SessionId,
capture_pending_traces: bool,
next_trace_id: TraceId,
pending_traces: BTreeMap<TraceId, Backtrace>,
}
struct WakerFn<F> {
f: F,
const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
TracingWaker::clone_raw,
TracingWaker::wake_raw,
TracingWaker::wake_by_ref_raw,
TracingWaker::drop_raw,
);
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
struct TraceId(usize);
struct TracingWaker {
id: Option<TraceId>,
awoken: Arc<AtomicBool>,
thread: Thread,
state: Arc<Mutex<SchedulerState>>,
}
impl<F: Fn()> WakerFn<F> {
fn new(f: F) -> Self {
Self { f }
impl Clone for TracingWaker {
fn clone(&self) -> Self {
let mut state = self.state.lock();
let id = if state.capture_pending_traces {
let id = state.next_trace_id;
state.next_trace_id.0 += 1;
state.pending_traces.insert(id, Backtrace::new_unresolved());
Some(id)
} else {
None
};
Self {
id,
awoken: self.awoken.clone(),
thread: self.thread.clone(),
state: self.state.clone(),
}
}
}
impl<F: Fn()> Wake for WakerFn<F> {
fn wake(self: Arc<Self>) {
(self.f)();
impl Drop for TracingWaker {
fn drop(&mut self) {
if let Some(id) = self.id {
self.state.lock().pending_traces.remove(&id);
}
}
}
impl TracingWaker {
fn wake(self) {
self.wake_by_ref();
}
fn wake_by_ref(self: &Arc<Self>) {
(self.f)();
fn wake_by_ref(&self) {
if let Some(id) = self.id {
self.state.lock().pending_traces.remove(&id);
}
self.awoken.store(true, SeqCst);
self.thread.unpark();
}
fn clone_raw(waker: *const ()) -> RawWaker {
let waker = waker as *const TracingWaker;
let waker = unsafe { &*waker };
RawWaker::new(
Box::into_raw(Box::new(waker.clone())) as *const (),
&WAKER_VTABLE,
)
}
fn wake_raw(waker: *const ()) {
let waker = unsafe { Box::from_raw(waker as *mut TracingWaker) };
waker.wake();
}
fn wake_by_ref_raw(waker: *const ()) {
let waker = waker as *const TracingWaker;
let waker = unsafe { &*waker };
waker.wake_by_ref();
}
fn drop_raw(waker: *const ()) {
let waker = unsafe { Box::from_raw(waker as *mut TracingWaker) };
drop(waker);
}
}
@@ -342,11 +505,20 @@ impl Future for Yield {
}
}
fn park(parker: &parking::Parker, deadline: Option<Instant>) -> bool {
if let Some(deadline) = deadline {
parker.park_deadline(deadline)
} else {
parker.park();
true
fn exclude_wakers_from_trace(mut trace: Backtrace) -> Backtrace {
trace.resolve();
let mut frames: Vec<BacktraceFrame> = trace.into();
let waker_clone_frame_ix = frames.iter().position(|frame| {
frame.symbols().iter().any(|symbol| {
symbol
.name()
.is_some_and(|name| format!("{name:#?}") == type_name_of_val(&Waker::clone))
})
});
if let Some(waker_clone_frame_ix) = waker_clone_frame_ix {
frames.drain(..waker_clone_frame_ix + 1);
}
Backtrace::from(frames)
}

View File

@@ -153,7 +153,7 @@ fn test_randomize_order() {
// Test deterministic mode: different seeds should produce same execution order
let mut deterministic_results = HashSet::new();
for seed in 0..10 {
let config = SchedulerConfig {
let config = TestSchedulerConfig {
seed,
randomize_order: false,
..Default::default()
@@ -173,7 +173,7 @@ fn test_randomize_order() {
// Test randomized mode: different seeds can produce different execution orders
let mut randomized_results = HashSet::new();
for seed in 0..20 {
let config = SchedulerConfig::with_seed(seed);
let config = TestSchedulerConfig::with_seed(seed);
let order = block_on(capture_execution_order(config));
assert_eq!(order.len(), 6);
randomized_results.insert(order);
@@ -186,7 +186,7 @@ fn test_randomize_order() {
);
}
async fn capture_execution_order(config: SchedulerConfig) -> Vec<String> {
async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
let scheduler = Arc::new(TestScheduler::new(config));
let foreground = scheduler.foreground();
let background = scheduler.background();
@@ -221,49 +221,55 @@ async fn capture_execution_order(config: SchedulerConfig) -> Vec<String> {
#[test]
fn test_block() {
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
let executor = BackgroundExecutor::new(scheduler);
let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
let (tx, rx) = oneshot::channel();
// Spawn background task to send value
let _ = executor
let _ = scheduler
.background()
.spawn(async move {
tx.send(42).unwrap();
})
.detach();
// Block on receiving the value
let result = executor.block_on(async { rx.await.unwrap() });
let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
assert_eq!(result, 42);
}
#[test]
#[should_panic(expected = "Parking forbidden")]
#[should_panic(expected = "<futures_channel::oneshot::Inner<()>>::recv")]
fn test_parking_panics() {
let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
let executor = BackgroundExecutor::new(scheduler);
executor.block_on(future::pending::<()>());
let config = TestSchedulerConfig {
capture_pending_traces: true,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
scheduler.foreground().block_on(async {
let (_tx, rx) = oneshot::channel::<()>();
rx.await.unwrap(); // This will never complete
});
}
#[test]
fn test_block_with_parking() {
let config = SchedulerConfig {
let config = TestSchedulerConfig {
allow_parking: true,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
let executor = BackgroundExecutor::new(scheduler);
let (tx, rx) = oneshot::channel();
// Spawn background task to send value
let _ = executor
let _ = scheduler
.background()
.spawn(async move {
tx.send(42).unwrap();
})
.detach();
// Block on receiving the value (will park if needed)
let result = executor.block_on(async { rx.await.unwrap() });
let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
assert_eq!(result, 42);
}
@@ -298,30 +304,31 @@ fn test_helper_methods() {
fn test_block_with_timeout() {
// Test case: future completes within timeout
TestScheduler::once(async |scheduler| {
let background = scheduler.background();
let mut future = future::ready(42);
let output = background.block_with_timeout(&mut future, Duration::from_millis(100));
assert_eq!(output, Some(42));
let foreground = scheduler.foreground();
let future = future::ready(42);
let output = foreground.block_with_timeout(Duration::from_millis(100), future);
assert_eq!(output.unwrap(), 42);
});
// Test case: future times out
TestScheduler::once(async |scheduler| {
let background = scheduler.background();
let mut future = future::pending::<()>();
let output = background.block_with_timeout(&mut future, Duration::from_millis(50));
assert_eq!(output, None);
let foreground = scheduler.foreground();
let future = future::pending::<()>();
let output = foreground.block_with_timeout(Duration::from_millis(50), future);
let _ = output.expect_err("future should not have finished");
});
// Test case: future makes progress via timer but still times out
let mut results = BTreeSet::new();
TestScheduler::many(100, async |scheduler| {
let background = scheduler.background();
let mut task = background.spawn(async move {
let task = scheduler.background().spawn(async move {
Yield { polls: 10 }.await;
42
});
let output = background.block_with_timeout(&mut task, Duration::from_millis(50));
results.insert(output);
let output = scheduler
.foreground()
.block_with_timeout(Duration::from_millis(50), task);
results.insert(output.ok());
});
assert_eq!(
results.into_iter().collect::<Vec<_>>(),
@@ -329,6 +336,33 @@ fn test_block_with_timeout() {
);
}
// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
#[test]
fn test_block_does_not_progress_same_session_foreground() {
let mut task2_made_progress_once = false;
TestScheduler::many(1000, async |scheduler| {
let foreground1 = scheduler.foreground();
let foreground2 = scheduler.foreground();
let task1 = foreground1.spawn(async move {});
let task2 = foreground2.spawn(async move {});
foreground1.block_on(async {
scheduler.yield_random().await;
assert!(!task1.is_ready());
task2_made_progress_once |= task2.is_ready();
});
task1.await;
task2.await;
});
assert!(
task2_made_progress_once,
"Expected task from different foreground executor to make progress (at least once)"
);
}
struct Yield {
polls: usize,
}

View File

@@ -93,8 +93,8 @@ impl PickerDelegate for Delegate {
) -> Task<()> {
let candidates = self.candidates.clone();
self.matches = cx
.background_executor()
.block(fuzzy::match_strings(
.foreground_executor()
.block_on(fuzzy::match_strings(
&candidates,
&query,
true,

View File

@@ -32,7 +32,7 @@ assistant_tools.workspace = true
audio.workspace = true
auto_update.workspace = true
auto_update_ui.workspace = true
backtrace = "0.3"
backtrace.workspace = true
bincode.workspace = true
breadcrumbs.workspace = true
call.workspace = true

View File

@@ -262,10 +262,10 @@ pub fn main() {
let app = Application::new().with_assets(Assets);
let system_id = app.background_executor().block(system_id()).ok();
let installation_id = app.background_executor().block(installation_id()).ok();
let system_id = app.foreground_executor().block_on(system_id()).ok();
let installation_id = app.foreground_executor().block_on(installation_id()).ok();
let session_id = Uuid::new_v4().to_string();
let session = app.background_executor().block(Session::new());
let session = app.foreground_executor().block_on(Session::new());
app.background_executor()
.spawn(crashes::init(InitCrashHandler {
@@ -1282,7 +1282,7 @@ fn load_embedded_fonts(cx: &App) {
let embedded_fonts = Mutex::new(Vec::new());
let executor = cx.background_executor();
executor.block(executor.scoped(|scope| {
cx.foreground_executor().block_on(executor.scoped(|scope| {
for font_path in &font_paths {
if !font_path.ends_with(".ttf") {
continue;

View File

@@ -1229,12 +1229,12 @@ pub fn handle_settings_file_changes(
// Initial load of both settings files
let global_content = cx
.background_executor()
.block(global_settings_file_rx.next())
.foreground_executor()
.block_on(global_settings_file_rx.next())
.unwrap();
let user_content = cx
.background_executor()
.block(user_settings_file_rx.next())
.foreground_executor()
.block_on(user_settings_file_rx.next())
.unwrap();
SettingsStore::update_global(cx, |store, cx| {