Compare commits

...

3 Commits

Author SHA1 Message Date
Max Brunsfeld
5570abf9a2 Clippy 2025-12-18 12:53:03 -08:00
Max Brunsfeld
d5b1fafb1f Use async streaming when detecting matches 2025-12-18 12:49:04 -08:00
Max Brunsfeld
81dd5b9c9c Allow project search to be efficiently cancellable 2025-12-18 10:34:15 -08:00
15 changed files with 124 additions and 70 deletions

View File

@@ -170,12 +170,15 @@ impl AgentTool for GrepTool {
Err(error) => return Task::ready(Err(error)),
};
let results = self
let (results, search_task) = self
.project
.update(cx, |project, cx| project.search(query, cx));
let project = self.project.downgrade();
cx.spawn(async move |cx| {
// Keep the search alive for the duration of result iteration. Dropping this task is the
// cancellation mechanism; we intentionally do not detach it.
let _search_task = search_task;
futures::pin_mut!(results);
let mut output = String::new();

View File

@@ -5179,7 +5179,7 @@ async fn test_project_search(
// Perform a search as the guest.
let mut results = HashMap::default();
let search_rx = project_b.update(cx_b, |project, cx| {
let (search_rx, search_task) = project_b.update(cx_b, |project, cx| {
project.search(
SearchQuery::text(
"world",
@@ -5195,6 +5195,8 @@ async fn test_project_search(
cx,
)
});
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
let _search_task = search_task;
while let Ok(result) = search_rx.recv().await {
match result {
SearchResult::Buffer { buffer, ranges } => {

View File

@@ -886,7 +886,7 @@ impl RandomizedTest for ProjectCollaborationTest {
if detach { "detaching" } else { "awaiting" }
);
let search = project.update(cx, |project, cx| {
let (search, search_task) = project.update(cx, |project, cx| {
project.search(
SearchQuery::text(
query,
@@ -904,6 +904,9 @@ impl RandomizedTest for ProjectCollaborationTest {
});
drop(project);
let search = cx.executor().spawn(async move {
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
let _search_task = search_task;
let mut results = HashMap::default();
while let Ok(result) = search.recv().await {
if let SearchResult::Buffer { buffer, ranges } = result {

View File

@@ -8,6 +8,7 @@ use dap::{
},
};
use fs::Fs;
use futures::AsyncReadExt as _;
use gpui::{AsyncApp, SharedString};
use language::LanguageName;
use log::warn;
@@ -557,10 +558,18 @@ async fn handle_envs(
continue;
};
if let Ok(file) = fs.open_sync(&path).await {
let file_envs: HashMap<String, String> = dotenvy::from_read_iter(file)
.filter_map(Result::ok)
.collect();
if let Ok(mut file) = fs.open_read(&path).await {
let mut bytes = Vec::new();
if file.read_to_end(&mut bytes).await.is_err() {
warn!("While starting Go debug session: failed to read env file {path:?}");
continue;
}
let file_envs: HashMap<String, String> =
dotenvy::from_read_iter(std::io::Cursor::new(bytes))
.filter_map(Result::ok)
.collect();
envs.extend(file_envs.iter().map(|(k, v)| (k.clone(), v.clone())));
env_vars.extend(file_envs);
} else {

View File

@@ -12,6 +12,7 @@ use extension::{
WorktreeDelegate,
};
use fs::{Fs, normalize_path};
use futures::AsyncReadExt;
use futures::future::LocalBoxFuture;
use futures::{
Future, FutureExt, StreamExt as _,
@@ -763,13 +764,14 @@ impl WasmExtension {
let mut wasm_file = wasm_host
.fs
.open_sync(&path)
.open_read(&path)
.await
.context(format!("opening wasm file, path: {path:?}"))?;
let mut wasm_bytes = Vec::new();
wasm_file
.read_to_end(&mut wasm_bytes)
.await
.context(format!("reading wasm file, path: {path:?}"))?;
wasm_host

View File

@@ -66,6 +66,8 @@ use std::ffi::OsStr;
#[cfg(any(test, feature = "test-support"))]
pub use fake_git_repo::{LOAD_HEAD_TEXT_TASK, LOAD_INDEX_TEXT_TASK};
pub type AsyncReadBox = Pin<Box<dyn AsyncRead + Send>>;
pub trait Watcher: Send + Sync {
fn add(&self, path: &Path) -> Result<()>;
fn remove(&self, path: &Path) -> Result<()>;
@@ -116,7 +118,7 @@ pub trait Fs: Send + Sync {
self.remove_file(path, options).await
}
async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>>;
async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>>;
async fn open_read(&self, path: &Path) -> Result<AsyncReadBox>;
async fn load(&self, path: &Path) -> Result<String> {
Ok(String::from_utf8(self.load_bytes(path).await?)?)
}
@@ -732,8 +734,8 @@ impl Fs for RealFs {
Ok(())
}
async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>> {
Ok(Box::new(std::fs::File::open(path)?))
async fn open_read(&self, path: &Path) -> Result<AsyncReadBox> {
Ok(Box::pin(smol::fs::File::open(path).await?))
}
async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>> {
@@ -2530,9 +2532,9 @@ impl Fs for FakeFs {
Ok(())
}
async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>> {
async fn open_read(&self, path: &Path) -> Result<AsyncReadBox> {
let bytes = self.load_internal(path).await?;
Ok(Box::new(io::Cursor::new(bytes)))
Ok(Box::pin(futures::io::Cursor::new(bytes)))
}
async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>> {

View File

@@ -4141,7 +4141,11 @@ impl Project {
searcher.into_handle(query, cx)
}
pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
pub fn search(
&mut self,
query: SearchQuery,
cx: &mut Context<Self>,
) -> (Receiver<SearchResult>, Task<()>) {
self.search_impl(query, cx).results(cx)
}
@@ -5025,10 +5029,14 @@ impl Project {
let path_style = this.read_with(&cx, |this, cx| this.path_style(cx))?;
let query =
SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
let results = this.update(&mut cx, |this, cx| {
let (results, search_task) = this.update(&mut cx, |this, cx| {
this.search_impl(query, cx).matching_buffers(cx)
})?;
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
// We intentionally do not detach it.
let _search_task = search_task;
let mut response = proto::FindSearchCandidatesResponse {
buffer_ids: Vec::new(),
};

View File

@@ -1,7 +1,6 @@
use std::{
cell::LazyCell,
collections::BTreeSet,
io::{BufRead, BufReader},
ops::Range,
path::{Path, PathBuf},
pin::pin,
@@ -11,7 +10,7 @@ use std::{
use anyhow::Context;
use collections::HashSet;
use fs::Fs;
use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
use futures::{AsyncBufReadExt, SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
use gpui::{App, AppContext, AsyncApp, Entity, Task};
use language::{Buffer, BufferSnapshot};
use parking_lot::Mutex;
@@ -68,13 +67,14 @@ pub struct SearchResultsHandle {
}
impl SearchResultsHandle {
pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
(self.trigger_search)(cx).detach();
self.results
pub fn results(self, cx: &mut App) -> (Receiver<SearchResult>, Task<()>) {
let task = (self.trigger_search)(cx);
(self.results, task)
}
pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
(self.trigger_search)(cx).detach();
self.matching_buffers
pub fn matching_buffers(self, cx: &mut App) -> (Receiver<Entity<Buffer>>, Task<()>) {
let task = (self.trigger_search)(cx);
(self.matching_buffers, task)
}
}
@@ -683,29 +683,37 @@ impl RequestHandler<'_> {
async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
_=maybe!(async move {
let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
return anyhow::Ok(());
};
let mut file = BufReader::new(file);
let file_start = file.fill_buf()?;
let file = self
.fs
.context("Trying to query filesystem in remote project search")?
.open_read(&abs_path)
.await?;
if let Err(Some(starting_position)) =
std::str::from_utf8(file_start).map_err(|e| e.error_len())
{
// Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
// That way we can still match files in a streaming fashion without having look at "obviously binary" files.
log::debug!(
"Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
);
return Ok(());
let mut file = futures::io::BufReader::new(file);
// Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
// That way we can still match files in a streaming fashion without having look at "obviously binary" files.
//
// Use `fill_buf` so we can "peek" without consuming bytes, and reuse the same stream for detection.
let buffer = file.fill_buf().await?;
let prefix_len = buffer.len().min(8192);
if let Err(error) = std::str::from_utf8(&buffer[..prefix_len]) {
if let Some(starting_position) = error.error_len() {
log::debug!(
"Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
);
return Ok(());
}
}
if self.query.detect(file).unwrap_or(false) {
if self.query.detect(file).await.unwrap_or(false) {
// Yes, we should scan the whole file.
entry.should_scan_tx.send(entry.path).await?;
}
Ok(())
anyhow::Ok(())
}).await;
}

View File

@@ -10400,7 +10400,10 @@ async fn search(
query: SearchQuery,
cx: &mut gpui::TestAppContext,
) -> Result<HashMap<String, Vec<Range<usize>>>> {
let search_rx = project.update(cx, |project, cx| project.search(query, cx));
let (search_rx, search_task) = project.update(cx, |project, cx| project.search(query, cx));
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
let _search_task = search_task;
let mut results = HashMap::default();
while let Ok(search_result) = search_rx.recv().await {
match search_result {

View File

@@ -2,13 +2,13 @@ use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
use anyhow::Result;
use client::proto;
use fancy_regex::{Captures, Regex, RegexBuilder};
use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt};
use gpui::Entity;
use itertools::Itertools as _;
use language::{Buffer, BufferSnapshot, CharKind};
use smol::future::yield_now;
use std::{
borrow::Cow,
io::{BufRead, BufReader, Read},
ops::Range,
sync::{Arc, LazyLock},
};
@@ -326,37 +326,38 @@ impl SearchQuery {
}
}
pub(crate) fn detect(
&self,
mut reader: BufReader<Box<dyn Read + Send + Sync>>,
) -> Result<bool> {
if self.as_str().is_empty() {
pub(crate) async fn detect(&self, mut reader: impl AsyncBufRead + Unpin) -> Result<bool> {
let query_str = self.as_str();
let needle_len = query_str.len();
if needle_len == 0 {
return Ok(false);
}
let mut text = String::new();
match self {
Self::Text { search, .. } => {
let mat = search.stream_find_iter(reader).next();
match mat {
Some(Ok(_)) => Ok(true),
Some(Err(err)) => Err(err.into()),
None => Ok(false),
if query_str.contains('\n') {
reader.read_to_string(&mut text).await?;
Ok(search.find(&text).is_some())
} else {
while reader.read_line(&mut text).await? > 0 {
if search.find(&text).is_some() {
return Ok(true);
}
text.clear();
}
Ok(false)
}
}
Self::Regex {
regex, multiline, ..
} => {
if *multiline {
let mut text = String::new();
if let Err(err) = reader.read_to_string(&mut text) {
Err(err.into())
} else {
Ok(regex.find(&text)?.is_some())
}
reader.read_to_string(&mut text).await?;
Ok(regex.find(&text)?.is_some())
} else {
for line in reader.lines() {
let line = line?;
if regex.find(&line)?.is_some() {
while reader.read_line(&mut text).await? > 0 {
if regex.find(&text)?.is_some() {
return Ok(true);
}
}

View File

@@ -1,5 +1,4 @@
use std::{
io::{BufRead, BufReader},
path::{Path, PathBuf},
pin::pin,
sync::{Arc, atomic::AtomicUsize},
@@ -8,7 +7,7 @@ use std::{
use anyhow::{Context as _, Result, anyhow, bail};
use collections::{HashMap, HashSet};
use fs::{Fs, copy_recursive};
use futures::{FutureExt, SinkExt, future::Shared};
use futures::{AsyncBufReadExt as _, FutureExt, SinkExt, future::Shared, io::BufReader};
use gpui::{
App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
};
@@ -1024,15 +1023,15 @@ impl WorktreeStore {
let mut input = pin!(input);
while let Some(mut entry) = input.next().await {
let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
let Some(file) = fs.open_read(&abs_path).await.log_err() else {
continue;
};
let mut file = BufReader::new(file);
let file_start = file.fill_buf()?;
let file_start = file.fill_buf().await?;
if let Err(Some(starting_position)) =
std::str::from_utf8(file_start).map_err(|e| e.error_len())
std::str::from_utf8(&file_start).map_err(|e| e.error_len())
{
// Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
// That way we can still match files in a streaming fashion without having look at "obviously binary" files.
@@ -1042,7 +1041,7 @@ impl WorktreeStore {
continue;
}
if query.detect(file).unwrap_or(false) {
if query.detect(file).await.unwrap_or(false) {
entry.respond.send(entry.path).await?
}
}

View File

@@ -102,9 +102,11 @@ fn main() -> Result<(), anyhow::Error> {
println!("Starting a project search");
let timer = std::time::Instant::now();
let mut first_match = None;
let matches = project
let (matches, _search_task) = project
.update(cx, |this, cx| this.search(query, cx))
.unwrap();
let mut matched_files = 0;
let mut matched_chunks = 0;
while let Ok(match_result) = matches.recv().await {

View File

@@ -771,7 +771,7 @@ impl HeadlessProject {
message.query.context("missing query field")?,
PathStyle::local(),
)?;
let results = this.update(&mut cx, |this, cx| {
let (results, search_task) = this.update(&mut cx, |this, cx| {
project::Search::local(
this.fs.clone(),
this.buffer_store.clone(),
@@ -783,6 +783,10 @@ impl HeadlessProject {
.matching_buffers(cx)
})?;
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
// We intentionally do not detach it.
let _search_task = search_task;
let mut response = proto::FindSearchCandidatesResponse {
buffer_ids: Vec::new(),
};

View File

@@ -194,7 +194,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
cx.run_until_parked();
async fn do_search(project: &Entity<Project>, mut cx: TestAppContext) -> Entity<Buffer> {
let receiver = project.update(&mut cx, |project, cx| {
let (receiver, search_task) = project.update(&mut cx, |project, cx| {
project.search(
SearchQuery::text(
"project",
@@ -211,6 +211,10 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
)
});
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
// We intentionally do not detach it.
let _search_task = search_task;
let first_response = receiver.recv().await.unwrap();
let SearchResult::Buffer { buffer, .. } = first_response else {
panic!("incorrect result");

View File

@@ -303,7 +303,7 @@ impl ProjectSearch {
}
fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) {
let search = self.project.update(cx, |project, cx| {
let (search, project_search_task) = self.project.update(cx, |project, cx| {
project
.search_history_mut(SearchInputKind::Query)
.add(&mut self.search_history_cursor, query.as_str().to_string());
@@ -326,6 +326,10 @@ impl ProjectSearch {
self.active_query = Some(query);
self.match_ranges.clear();
self.pending_search = Some(cx.spawn(async move |project_search, cx| {
// Keep the search task alive for the lifetime of this pending search task.
// Dropping it is the cancellation mechanism; we intentionally do not detach it.
let _project_search_task = project_search_task;
let mut matches = pin!(search.ready_chunks(1024));
project_search
.update(cx, |project_search, cx| {