Compare commits

...

3 Commits

Author SHA1 Message Date
Piotr Osiewicz
936cf3814d Merge branch main into remote-project-search-stream 2025-12-30 15:33:09 +01:00
Piotr Osiewicz
ffe2da5d17 Fix remote server build 2025-12-18 14:38:48 +01:00
Piotr Osiewicz
738dac0d6e project search: Stream result buffers sooner in remote scenarios
This improves latency, but kills throughput of remote search.

Co-authored-by: Smit Barmase <heysmitbarmase@gmail.com>
2025-12-18 12:31:03 +01:00
7 changed files with 189 additions and 40 deletions

View File

@@ -38,6 +38,8 @@ pub struct BufferStore {
downstream_client: Option<(AnyProtoClient, u64)>,
shared_buffers: HashMap<proto::PeerId, HashMap<BufferId, SharedBuffer>>,
non_searchable_buffers: HashSet<BufferId>,
project_search_chunks: HashMap<u64, smol::channel::Sender<BufferId>>,
pub next_project_search_id: u64,
}
#[derive(Hash, Eq, PartialEq, Clone)]
@@ -756,6 +758,8 @@ impl BufferStore {
loading_buffers: Default::default(),
non_searchable_buffers: Default::default(),
worktree_store,
project_search_chunks: Default::default(),
next_project_search_id: 0,
}
}
@@ -781,6 +785,8 @@ impl BufferStore {
shared_buffers: Default::default(),
non_searchable_buffers: Default::default(),
worktree_store,
project_search_chunks: Default::default(),
next_project_search_id: 0,
}
}
@@ -1673,6 +1679,51 @@ impl BufferStore {
}
serialized_transaction
}
pub(crate) fn register_project_search_result_handle(
&mut self,
handle: u64,
sender: smol::channel::Sender<BufferId>,
) {
self.project_search_chunks.insert(handle, sender);
}
pub(crate) async fn handle_find_search_candidates_chunk(
this: Entity<Self>,
envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
mut cx: AsyncApp,
) -> Result<proto::Ack> {
use proto::find_search_candidates_chunk::Variant;
let handle = envelope.payload.handle;
let buffer_ids = match envelope
.payload
.variant
.context("Expected non-null variant")?
{
Variant::Matches(find_search_candidates_matches) => find_search_candidates_matches
.buffer_ids
.into_iter()
.filter_map(|buffer_id| BufferId::new(buffer_id).ok())
.collect::<Vec<_>>(),
Variant::Done(_) => {
this.update(&mut cx, |this, _| {
this.project_search_chunks.remove(&handle)
})?;
return Ok(proto::Ack {});
}
};
let Some(sender) = this.read_with(&mut cx, |this, _| {
this.project_search_chunks.get(&handle).cloned()
})?
else {
return Ok(proto::Ack {});
};
for buffer_id in buffer_ids {
sender.send(buffer_id).await?;
}
Ok(proto::Ack {})
}
}
impl OpenBuffer {

View File

@@ -1489,6 +1489,7 @@ impl Project {
remote_proto.add_entity_request_handler(Self::handle_update_buffer_from_remote_server);
remote_proto.add_entity_request_handler(Self::handle_trust_worktrees);
remote_proto.add_entity_request_handler(Self::handle_restrict_worktrees);
remote_proto.add_entity_request_handler(Self::handle_find_search_candidates_chunk);
BufferStore::init(&remote_proto);
LspStore::init(&remote_proto);
@@ -4910,6 +4911,15 @@ impl Project {
Ok(proto::Ack {})
}
async fn handle_find_search_candidates_chunk(
this: Entity<Self>,
envelope: TypedEnvelope<proto::FindSearchCandidatesChunk>,
mut cx: AsyncApp,
) -> Result<proto::Ack> {
let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone())?;
BufferStore::handle_find_search_candidates_chunk(buffer_store, envelope, cx).await
}
async fn handle_update_buffer(
this: Entity<Self>,
envelope: TypedEnvelope<proto::UpdateBuffer>,
@@ -5020,23 +5030,55 @@ impl Project {
) -> Result<proto::FindSearchCandidatesResponse> {
let peer_id = envelope.original_sender_id()?;
let message = envelope.payload;
let project_id = message.project_id;
let path_style = this.read_with(&cx, |this, cx| this.path_style(cx))?;
let query =
SearchQuery::from_proto(message.query.context("missing query field")?, path_style)?;
let results = this.update(&mut cx, |this, cx| {
this.search_impl(query, cx).matching_buffers(cx)
let next_project_search_id = this.update(&mut cx, |this, cx| {
this.buffer_store.update(cx, |this, _| {
util::post_inc(&mut this.next_project_search_id)
})
})?;
let mut response = proto::FindSearchCandidatesResponse {
buffer_ids: Vec::new(),
let response = proto::FindSearchCandidatesResponse {
handle: next_project_search_id,
};
while let Ok(buffer) = results.rx.recv().await {
this.update(&mut cx, |this, cx| {
let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
response.buffer_ids.push(buffer_id.to_proto());
let client = this.read_with(&cx, |this, _| this.client())?;
cx.spawn(async move |cx| {
let results = this.update(cx, |this, cx| {
this.search_impl(query, cx).matching_buffers(cx)
})?;
}
let mut buffer_ids = vec![];
while let Ok(buffer) = results.rx.recv().await {
this.update(cx, |this, cx| {
let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx);
buffer_ids.push(buffer_id.to_proto());
})?;
let _ = client
.request(proto::FindSearchCandidatesChunk {
handle: next_project_search_id,
project_id,
variant: Some(proto::find_search_candidates_chunk::Variant::Matches(
proto::FindSearchCandidatesMatches {
buffer_ids: std::mem::take(&mut buffer_ids),
},
)),
})
.await?;
}
let _ = client
.request(proto::FindSearchCandidatesChunk {
handle: next_project_search_id,
project_id,
variant: Some(proto::find_search_candidates_chunk::Variant::Done(
proto::FindSearchCandidatesDone {},
)),
})
.await?;
anyhow::Ok(())
})
.detach();
Ok(response)
}

View File

@@ -22,7 +22,6 @@ use smol::{
future::FutureExt,
};
use text::BufferId;
use util::{ResultExt, maybe, paths::compare_rel_paths, rel_path::RelPath};
use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeSettings};
@@ -274,9 +273,16 @@ impl Search {
.spawn(async move |cx| {
let _ = maybe!(async move {
let response = request.await?;
for buffer_id in response.buffer_ids {
let buffer_id = BufferId::new(buffer_id)?;
let buffer = weak_buffer_store
let (tx, rx) = unbounded();
weak_buffer_store.update(cx, |this, _| {
this.register_project_search_result_handle(
response.handle,
tx,
);
})?;
while let Ok(buffer_id) = rx.recv().await {
let buffer = buffer_store
.update(cx, |buffer_store, cx| {
buffer_store.wait_for_remote_buffer(buffer_id, cx)
})?

View File

@@ -311,5 +311,20 @@ message FindSearchCandidates {
}
message FindSearchCandidatesResponse {
repeated uint64 buffer_ids = 1;
uint64 handle = 1;
}
message FindSearchCandidatesDone {}
message FindSearchCandidatesMatches {
repeated uint64 buffer_ids = 1;
}
message FindSearchCandidatesChunk {
uint64 project_id = 1;
uint64 handle = 2;
oneof variant {
FindSearchCandidatesMatches matches = 3;
FindSearchCandidatesDone done = 4;
}
}

View File

@@ -451,7 +451,9 @@ message Envelope {
GitRemoveRemote git_remove_remote = 403;
TrustWorktrees trust_worktrees = 404;
RestrictWorktrees restrict_worktrees = 405; // current max
RestrictWorktrees restrict_worktrees = 405;
FindSearchCandidatesChunk find_search_candidates_chunk = 406; // current max
}
reserved 87 to 88, 396;

View File

@@ -345,7 +345,8 @@ messages!(
(RemoteStarted, Background),
(GitGetWorktrees, Background),
(GitWorktreesResponse, Background),
(GitCreateWorktree, Background)
(GitCreateWorktree, Background),
(FindSearchCandidatesChunk, Background)
);
request_messages!(
@@ -534,6 +535,7 @@ request_messages!(
(GitCreateWorktree, Ack),
(TrustWorktrees, Ack),
(RestrictWorktrees, Ack),
(FindSearchCandidatesChunk, Ack),
);
lsp_messages!(
@@ -709,6 +711,7 @@ entity_messages!(
GitCreateWorktree,
TrustWorktrees,
RestrictWorktrees,
FindSearchCandidatesChunk,
);
entity_messages!(

View File

@@ -775,33 +775,63 @@ impl HeadlessProject {
message.query.context("missing query field")?,
PathStyle::local(),
)?;
let results = this.update(&mut cx, |this, cx| {
project::Search::local(
this.fs.clone(),
this.buffer_store.clone(),
this.worktree_store.clone(),
message.limit as _,
cx,
)
.into_handle(query, cx)
.matching_buffers(cx)
let project_id = message.project_id;
let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
let next_project_search_id = this.update(&mut cx, |this, cx| {
this.buffer_store.update(cx, |this, _| {
util::post_inc(&mut this.next_project_search_id)
})
})?;
let mut response = proto::FindSearchCandidatesResponse {
buffer_ids: Vec::new(),
let response = proto::FindSearchCandidatesResponse {
handle: next_project_search_id,
};
let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
while let Ok(buffer) = results.rx.recv().await {
let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
response.buffer_ids.push(buffer_id.to_proto());
buffer_store
.update(&mut cx, |buffer_store, cx| {
buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
})?
let client = this.read_with(&cx, |this, _| this.session.clone())?;
cx.spawn(async move |cx| {
let results = this.update(cx, |this, cx| {
project::Search::local(
this.fs.clone(),
this.buffer_store.clone(),
this.worktree_store.clone(),
message.limit as _,
cx,
)
.into_handle(query, cx)
.matching_buffers(cx)
})?;
let mut buffer_ids = vec![];
while let Ok(buffer) = results.rx.recv().await {
buffer_store
.update(cx, |buffer_store, cx| {
buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
})?
.await?;
buffer_ids.push(buffer.read_with(cx, |this, _| this.remote_id().to_proto())?);
let _ = client
.request(proto::FindSearchCandidatesChunk {
handle: next_project_search_id,
project_id,
variant: Some(proto::find_search_candidates_chunk::Variant::Matches(
proto::FindSearchCandidatesMatches {
buffer_ids: std::mem::take(&mut buffer_ids),
},
)),
})
.await?;
}
let _ = client
.request(proto::FindSearchCandidatesChunk {
handle: next_project_search_id,
project_id,
variant: Some(proto::find_search_candidates_chunk::Variant::Done(
proto::FindSearchCandidatesDone {},
)),
})
.await?;
}
anyhow::Ok(())
})
.detach();
Ok(response)
}