Compare commits
3 Commits
git-integr
...
cancellabl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5570abf9a2 | ||
|
|
d5b1fafb1f | ||
|
|
81dd5b9c9c |
@@ -170,12 +170,15 @@ impl AgentTool for GrepTool {
|
||||
Err(error) => return Task::ready(Err(error)),
|
||||
};
|
||||
|
||||
let results = self
|
||||
let (results, search_task) = self
|
||||
.project
|
||||
.update(cx, |project, cx| project.search(query, cx));
|
||||
|
||||
let project = self.project.downgrade();
|
||||
cx.spawn(async move |cx| {
|
||||
// Keep the search alive for the duration of result iteration. Dropping this task is the
|
||||
// cancellation mechanism; we intentionally do not detach it.
|
||||
let _search_task = search_task;
|
||||
futures::pin_mut!(results);
|
||||
|
||||
let mut output = String::new();
|
||||
|
||||
@@ -5179,7 +5179,7 @@ async fn test_project_search(
|
||||
|
||||
// Perform a search as the guest.
|
||||
let mut results = HashMap::default();
|
||||
let search_rx = project_b.update(cx_b, |project, cx| {
|
||||
let (search_rx, search_task) = project_b.update(cx_b, |project, cx| {
|
||||
project.search(
|
||||
SearchQuery::text(
|
||||
"world",
|
||||
@@ -5195,6 +5195,8 @@ async fn test_project_search(
|
||||
cx,
|
||||
)
|
||||
});
|
||||
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
|
||||
let _search_task = search_task;
|
||||
while let Ok(result) = search_rx.recv().await {
|
||||
match result {
|
||||
SearchResult::Buffer { buffer, ranges } => {
|
||||
|
||||
@@ -886,7 +886,7 @@ impl RandomizedTest for ProjectCollaborationTest {
|
||||
if detach { "detaching" } else { "awaiting" }
|
||||
);
|
||||
|
||||
let search = project.update(cx, |project, cx| {
|
||||
let (search, search_task) = project.update(cx, |project, cx| {
|
||||
project.search(
|
||||
SearchQuery::text(
|
||||
query,
|
||||
@@ -904,6 +904,9 @@ impl RandomizedTest for ProjectCollaborationTest {
|
||||
});
|
||||
drop(project);
|
||||
let search = cx.executor().spawn(async move {
|
||||
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
|
||||
let _search_task = search_task;
|
||||
|
||||
let mut results = HashMap::default();
|
||||
while let Ok(result) = search.recv().await {
|
||||
if let SearchResult::Buffer { buffer, ranges } = result {
|
||||
|
||||
@@ -8,6 +8,7 @@ use dap::{
|
||||
},
|
||||
};
|
||||
use fs::Fs;
|
||||
use futures::AsyncReadExt as _;
|
||||
use gpui::{AsyncApp, SharedString};
|
||||
use language::LanguageName;
|
||||
use log::warn;
|
||||
@@ -557,10 +558,18 @@ async fn handle_envs(
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Ok(file) = fs.open_sync(&path).await {
|
||||
let file_envs: HashMap<String, String> = dotenvy::from_read_iter(file)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
if let Ok(mut file) = fs.open_read(&path).await {
|
||||
let mut bytes = Vec::new();
|
||||
if file.read_to_end(&mut bytes).await.is_err() {
|
||||
warn!("While starting Go debug session: failed to read env file {path:?}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_envs: HashMap<String, String> =
|
||||
dotenvy::from_read_iter(std::io::Cursor::new(bytes))
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
envs.extend(file_envs.iter().map(|(k, v)| (k.clone(), v.clone())));
|
||||
env_vars.extend(file_envs);
|
||||
} else {
|
||||
|
||||
@@ -12,6 +12,7 @@ use extension::{
|
||||
WorktreeDelegate,
|
||||
};
|
||||
use fs::{Fs, normalize_path};
|
||||
use futures::AsyncReadExt;
|
||||
use futures::future::LocalBoxFuture;
|
||||
use futures::{
|
||||
Future, FutureExt, StreamExt as _,
|
||||
@@ -763,13 +764,14 @@ impl WasmExtension {
|
||||
|
||||
let mut wasm_file = wasm_host
|
||||
.fs
|
||||
.open_sync(&path)
|
||||
.open_read(&path)
|
||||
.await
|
||||
.context(format!("opening wasm file, path: {path:?}"))?;
|
||||
|
||||
let mut wasm_bytes = Vec::new();
|
||||
wasm_file
|
||||
.read_to_end(&mut wasm_bytes)
|
||||
.await
|
||||
.context(format!("reading wasm file, path: {path:?}"))?;
|
||||
|
||||
wasm_host
|
||||
|
||||
@@ -66,6 +66,8 @@ use std::ffi::OsStr;
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub use fake_git_repo::{LOAD_HEAD_TEXT_TASK, LOAD_INDEX_TEXT_TASK};
|
||||
|
||||
pub type AsyncReadBox = Pin<Box<dyn AsyncRead + Send>>;
|
||||
|
||||
pub trait Watcher: Send + Sync {
|
||||
fn add(&self, path: &Path) -> Result<()>;
|
||||
fn remove(&self, path: &Path) -> Result<()>;
|
||||
@@ -116,7 +118,7 @@ pub trait Fs: Send + Sync {
|
||||
self.remove_file(path, options).await
|
||||
}
|
||||
async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>>;
|
||||
async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>>;
|
||||
async fn open_read(&self, path: &Path) -> Result<AsyncReadBox>;
|
||||
async fn load(&self, path: &Path) -> Result<String> {
|
||||
Ok(String::from_utf8(self.load_bytes(path).await?)?)
|
||||
}
|
||||
@@ -732,8 +734,8 @@ impl Fs for RealFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>> {
|
||||
Ok(Box::new(std::fs::File::open(path)?))
|
||||
async fn open_read(&self, path: &Path) -> Result<AsyncReadBox> {
|
||||
Ok(Box::pin(smol::fs::File::open(path).await?))
|
||||
}
|
||||
|
||||
async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>> {
|
||||
@@ -2530,9 +2532,9 @@ impl Fs for FakeFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn open_sync(&self, path: &Path) -> Result<Box<dyn io::Read + Send + Sync>> {
|
||||
async fn open_read(&self, path: &Path) -> Result<AsyncReadBox> {
|
||||
let bytes = self.load_internal(path).await?;
|
||||
Ok(Box::new(io::Cursor::new(bytes)))
|
||||
Ok(Box::pin(futures::io::Cursor::new(bytes)))
|
||||
}
|
||||
|
||||
async fn open_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>> {
|
||||
|
||||
@@ -4141,7 +4141,11 @@ impl Project {
|
||||
searcher.into_handle(query, cx)
|
||||
}
|
||||
|
||||
pub fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) -> Receiver<SearchResult> {
|
||||
pub fn search(
|
||||
&mut self,
|
||||
query: SearchQuery,
|
||||
cx: &mut Context<Self>,
|
||||
) -> (Receiver<SearchResult>, Task<()>) {
|
||||
self.search_impl(query, cx).results(cx)
|
||||
}
|
||||
|
||||
@@ -5025,10 +5029,14 @@ impl Project {
|
||||
let path_style = this.read_with(&cx, |this, cx| this.path_style(cx))?;
|
||||
let query =
|
||||
SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
|
||||
let results = this.update(&mut cx, |this, cx| {
|
||||
let (results, search_task) = this.update(&mut cx, |this, cx| {
|
||||
this.search_impl(query, cx).matching_buffers(cx)
|
||||
})?;
|
||||
|
||||
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
|
||||
// We intentionally do not detach it.
|
||||
let _search_task = search_task;
|
||||
|
||||
let mut response = proto::FindSearchCandidatesResponse {
|
||||
buffer_ids: Vec::new(),
|
||||
};
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::{
|
||||
cell::LazyCell,
|
||||
collections::BTreeSet,
|
||||
io::{BufRead, BufReader},
|
||||
ops::Range,
|
||||
path::{Path, PathBuf},
|
||||
pin::pin,
|
||||
@@ -11,7 +10,7 @@ use std::{
|
||||
use anyhow::Context;
|
||||
use collections::HashSet;
|
||||
use fs::Fs;
|
||||
use futures::{SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
|
||||
use futures::{AsyncBufReadExt, SinkExt, StreamExt, select_biased, stream::FuturesOrdered};
|
||||
use gpui::{App, AppContext, AsyncApp, Entity, Task};
|
||||
use language::{Buffer, BufferSnapshot};
|
||||
use parking_lot::Mutex;
|
||||
@@ -68,13 +67,14 @@ pub struct SearchResultsHandle {
|
||||
}
|
||||
|
||||
impl SearchResultsHandle {
|
||||
pub fn results(self, cx: &mut App) -> Receiver<SearchResult> {
|
||||
(self.trigger_search)(cx).detach();
|
||||
self.results
|
||||
pub fn results(self, cx: &mut App) -> (Receiver<SearchResult>, Task<()>) {
|
||||
let task = (self.trigger_search)(cx);
|
||||
(self.results, task)
|
||||
}
|
||||
pub fn matching_buffers(self, cx: &mut App) -> Receiver<Entity<Buffer>> {
|
||||
(self.trigger_search)(cx).detach();
|
||||
self.matching_buffers
|
||||
|
||||
pub fn matching_buffers(self, cx: &mut App) -> (Receiver<Entity<Buffer>>, Task<()>) {
|
||||
let task = (self.trigger_search)(cx);
|
||||
(self.matching_buffers, task)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -683,29 +683,37 @@ impl RequestHandler<'_> {
|
||||
async fn handle_find_first_match(&self, mut entry: MatchingEntry) {
|
||||
_=maybe!(async move {
|
||||
let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
|
||||
let Some(file) = self.fs.context("Trying to query filesystem in remote project search")?.open_sync(&abs_path).await.log_err() else {
|
||||
return anyhow::Ok(());
|
||||
};
|
||||
|
||||
let mut file = BufReader::new(file);
|
||||
let file_start = file.fill_buf()?;
|
||||
let file = self
|
||||
.fs
|
||||
.context("Trying to query filesystem in remote project search")?
|
||||
.open_read(&abs_path)
|
||||
.await?;
|
||||
|
||||
if let Err(Some(starting_position)) =
|
||||
std::str::from_utf8(file_start).map_err(|e| e.error_len())
|
||||
{
|
||||
// Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
|
||||
// That way we can still match files in a streaming fashion without having look at "obviously binary" files.
|
||||
log::debug!(
|
||||
"Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
|
||||
);
|
||||
return Ok(());
|
||||
let mut file = futures::io::BufReader::new(file);
|
||||
|
||||
// Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
|
||||
// That way we can still match files in a streaming fashion without having look at "obviously binary" files.
|
||||
//
|
||||
// Use `fill_buf` so we can "peek" without consuming bytes, and reuse the same stream for detection.
|
||||
let buffer = file.fill_buf().await?;
|
||||
let prefix_len = buffer.len().min(8192);
|
||||
|
||||
if let Err(error) = std::str::from_utf8(&buffer[..prefix_len]) {
|
||||
if let Some(starting_position) = error.error_len() {
|
||||
log::debug!(
|
||||
"Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if self.query.detect(file).unwrap_or(false) {
|
||||
if self.query.detect(file).await.unwrap_or(false) {
|
||||
// Yes, we should scan the whole file.
|
||||
entry.should_scan_tx.send(entry.path).await?;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
anyhow::Ok(())
|
||||
}).await;
|
||||
}
|
||||
|
||||
|
||||
@@ -10400,7 +10400,10 @@ async fn search(
|
||||
query: SearchQuery,
|
||||
cx: &mut gpui::TestAppContext,
|
||||
) -> Result<HashMap<String, Vec<Range<usize>>>> {
|
||||
let search_rx = project.update(cx, |project, cx| project.search(query, cx));
|
||||
let (search_rx, search_task) = project.update(cx, |project, cx| project.search(query, cx));
|
||||
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
|
||||
let _search_task = search_task;
|
||||
|
||||
let mut results = HashMap::default();
|
||||
while let Ok(search_result) = search_rx.recv().await {
|
||||
match search_result {
|
||||
|
||||
@@ -2,13 +2,13 @@ use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
|
||||
use anyhow::Result;
|
||||
use client::proto;
|
||||
use fancy_regex::{Captures, Regex, RegexBuilder};
|
||||
use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt};
|
||||
use gpui::Entity;
|
||||
use itertools::Itertools as _;
|
||||
use language::{Buffer, BufferSnapshot, CharKind};
|
||||
use smol::future::yield_now;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
io::{BufRead, BufReader, Read},
|
||||
ops::Range,
|
||||
sync::{Arc, LazyLock},
|
||||
};
|
||||
@@ -326,37 +326,38 @@ impl SearchQuery {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn detect(
|
||||
&self,
|
||||
mut reader: BufReader<Box<dyn Read + Send + Sync>>,
|
||||
) -> Result<bool> {
|
||||
if self.as_str().is_empty() {
|
||||
pub(crate) async fn detect(&self, mut reader: impl AsyncBufRead + Unpin) -> Result<bool> {
|
||||
let query_str = self.as_str();
|
||||
let needle_len = query_str.len();
|
||||
if needle_len == 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let mut text = String::new();
|
||||
match self {
|
||||
Self::Text { search, .. } => {
|
||||
let mat = search.stream_find_iter(reader).next();
|
||||
match mat {
|
||||
Some(Ok(_)) => Ok(true),
|
||||
Some(Err(err)) => Err(err.into()),
|
||||
None => Ok(false),
|
||||
if query_str.contains('\n') {
|
||||
reader.read_to_string(&mut text).await?;
|
||||
Ok(search.find(&text).is_some())
|
||||
} else {
|
||||
while reader.read_line(&mut text).await? > 0 {
|
||||
if search.find(&text).is_some() {
|
||||
return Ok(true);
|
||||
}
|
||||
text.clear();
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
Self::Regex {
|
||||
regex, multiline, ..
|
||||
} => {
|
||||
if *multiline {
|
||||
let mut text = String::new();
|
||||
if let Err(err) = reader.read_to_string(&mut text) {
|
||||
Err(err.into())
|
||||
} else {
|
||||
Ok(regex.find(&text)?.is_some())
|
||||
}
|
||||
reader.read_to_string(&mut text).await?;
|
||||
Ok(regex.find(&text)?.is_some())
|
||||
} else {
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
if regex.find(&line)?.is_some() {
|
||||
while reader.read_line(&mut text).await? > 0 {
|
||||
if regex.find(&text)?.is_some() {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::{
|
||||
io::{BufRead, BufReader},
|
||||
path::{Path, PathBuf},
|
||||
pin::pin,
|
||||
sync::{Arc, atomic::AtomicUsize},
|
||||
@@ -8,7 +7,7 @@ use std::{
|
||||
use anyhow::{Context as _, Result, anyhow, bail};
|
||||
use collections::{HashMap, HashSet};
|
||||
use fs::{Fs, copy_recursive};
|
||||
use futures::{FutureExt, SinkExt, future::Shared};
|
||||
use futures::{AsyncBufReadExt as _, FutureExt, SinkExt, future::Shared, io::BufReader};
|
||||
use gpui::{
|
||||
App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
|
||||
};
|
||||
@@ -1024,15 +1023,15 @@ impl WorktreeStore {
|
||||
let mut input = pin!(input);
|
||||
while let Some(mut entry) = input.next().await {
|
||||
let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
|
||||
let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
|
||||
let Some(file) = fs.open_read(&abs_path).await.log_err() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut file = BufReader::new(file);
|
||||
let file_start = file.fill_buf()?;
|
||||
let file_start = file.fill_buf().await?;
|
||||
|
||||
if let Err(Some(starting_position)) =
|
||||
std::str::from_utf8(file_start).map_err(|e| e.error_len())
|
||||
std::str::from_utf8(&file_start).map_err(|e| e.error_len())
|
||||
{
|
||||
// Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
|
||||
// That way we can still match files in a streaming fashion without having look at "obviously binary" files.
|
||||
@@ -1042,7 +1041,7 @@ impl WorktreeStore {
|
||||
continue;
|
||||
}
|
||||
|
||||
if query.detect(file).unwrap_or(false) {
|
||||
if query.detect(file).await.unwrap_or(false) {
|
||||
entry.respond.send(entry.path).await?
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,9 +102,11 @@ fn main() -> Result<(), anyhow::Error> {
|
||||
println!("Starting a project search");
|
||||
let timer = std::time::Instant::now();
|
||||
let mut first_match = None;
|
||||
let matches = project
|
||||
|
||||
let (matches, _search_task) = project
|
||||
.update(cx, |this, cx| this.search(query, cx))
|
||||
.unwrap();
|
||||
|
||||
let mut matched_files = 0;
|
||||
let mut matched_chunks = 0;
|
||||
while let Ok(match_result) = matches.recv().await {
|
||||
|
||||
@@ -771,7 +771,7 @@ impl HeadlessProject {
|
||||
message.query.context("missing query field")?,
|
||||
PathStyle::local(),
|
||||
)?;
|
||||
let results = this.update(&mut cx, |this, cx| {
|
||||
let (results, search_task) = this.update(&mut cx, |this, cx| {
|
||||
project::Search::local(
|
||||
this.fs.clone(),
|
||||
this.buffer_store.clone(),
|
||||
@@ -783,6 +783,10 @@ impl HeadlessProject {
|
||||
.matching_buffers(cx)
|
||||
})?;
|
||||
|
||||
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
|
||||
// We intentionally do not detach it.
|
||||
let _search_task = search_task;
|
||||
|
||||
let mut response = proto::FindSearchCandidatesResponse {
|
||||
buffer_ids: Vec::new(),
|
||||
};
|
||||
|
||||
@@ -194,7 +194,7 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
|
||||
cx.run_until_parked();
|
||||
|
||||
async fn do_search(project: &Entity<Project>, mut cx: TestAppContext) -> Entity<Buffer> {
|
||||
let receiver = project.update(&mut cx, |project, cx| {
|
||||
let (receiver, search_task) = project.update(&mut cx, |project, cx| {
|
||||
project.search(
|
||||
SearchQuery::text(
|
||||
"project",
|
||||
@@ -211,6 +211,10 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
|
||||
)
|
||||
});
|
||||
|
||||
// Keep the search task alive while we drain the receiver; dropping it cancels the search.
|
||||
// We intentionally do not detach it.
|
||||
let _search_task = search_task;
|
||||
|
||||
let first_response = receiver.recv().await.unwrap();
|
||||
let SearchResult::Buffer { buffer, .. } = first_response else {
|
||||
panic!("incorrect result");
|
||||
|
||||
@@ -303,7 +303,7 @@ impl ProjectSearch {
|
||||
}
|
||||
|
||||
fn search(&mut self, query: SearchQuery, cx: &mut Context<Self>) {
|
||||
let search = self.project.update(cx, |project, cx| {
|
||||
let (search, project_search_task) = self.project.update(cx, |project, cx| {
|
||||
project
|
||||
.search_history_mut(SearchInputKind::Query)
|
||||
.add(&mut self.search_history_cursor, query.as_str().to_string());
|
||||
@@ -326,6 +326,10 @@ impl ProjectSearch {
|
||||
self.active_query = Some(query);
|
||||
self.match_ranges.clear();
|
||||
self.pending_search = Some(cx.spawn(async move |project_search, cx| {
|
||||
// Keep the search task alive for the lifetime of this pending search task.
|
||||
// Dropping it is the cancellation mechanism; we intentionally do not detach it.
|
||||
let _project_search_task = project_search_task;
|
||||
|
||||
let mut matches = pin!(search.ready_chunks(1024));
|
||||
project_search
|
||||
.update(cx, |project_search, cx| {
|
||||
|
||||
Reference in New Issue
Block a user