Compare commits
13 Commits
fix-git-ht
...
summaries
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20c2a78510 | ||
|
|
f4c994d212 | ||
|
|
c261e1011b | ||
|
|
0edb105d1e | ||
|
|
3cef55d89f | ||
|
|
678b6120fd | ||
|
|
533aa2719c | ||
|
|
a07385e2a9 | ||
|
|
4e5d6fa435 | ||
|
|
31738f39f5 | ||
|
|
cede792786 | ||
|
|
1be22b5f17 | ||
|
|
030a2ae77b |
26
Cargo.lock
generated
26
Cargo.lock
generated
@@ -306,6 +306,9 @@ name = "arrayvec"
|
|||||||
version = "0.7.4"
|
version = "0.7.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
|
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "as-raw-xcb-connection"
|
name = "as-raw-xcb-connection"
|
||||||
@@ -1700,6 +1703,19 @@ dependencies = [
|
|||||||
"profiling",
|
"profiling",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "blake3"
|
||||||
|
version = "1.5.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210"
|
||||||
|
dependencies = [
|
||||||
|
"arrayref",
|
||||||
|
"arrayvec",
|
||||||
|
"cc",
|
||||||
|
"cfg-if",
|
||||||
|
"constant_time_eq",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "block"
|
name = "block"
|
||||||
version = "0.1.6"
|
version = "0.1.6"
|
||||||
@@ -2764,6 +2780,12 @@ dependencies = [
|
|||||||
"tiny-keccak",
|
"tiny-keccak",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "constant_time_eq"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "convert_case"
|
name = "convert_case"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
@@ -9568,6 +9590,8 @@ name = "semantic_index"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"arrayvec",
|
||||||
|
"blake3",
|
||||||
"client",
|
"client",
|
||||||
"clock",
|
"clock",
|
||||||
"collections",
|
"collections",
|
||||||
@@ -9580,6 +9604,7 @@ dependencies = [
|
|||||||
"heed",
|
"heed",
|
||||||
"http 0.1.0",
|
"http 0.1.0",
|
||||||
"language",
|
"language",
|
||||||
|
"language_model",
|
||||||
"languages",
|
"languages",
|
||||||
"log",
|
"log",
|
||||||
"open_ai",
|
"open_ai",
|
||||||
@@ -13753,6 +13778,7 @@ dependencies = [
|
|||||||
"audio",
|
"audio",
|
||||||
"auto_update",
|
"auto_update",
|
||||||
"backtrace",
|
"backtrace",
|
||||||
|
"blake3",
|
||||||
"breadcrumbs",
|
"breadcrumbs",
|
||||||
"call",
|
"call",
|
||||||
"channel",
|
"channel",
|
||||||
|
|||||||
@@ -284,6 +284,7 @@ zed_actions = { path = "crates/zed_actions" }
|
|||||||
alacritty_terminal = "0.23"
|
alacritty_terminal = "0.23"
|
||||||
any_vec = "0.13"
|
any_vec = "0.13"
|
||||||
anyhow = "1.0.57"
|
anyhow = "1.0.57"
|
||||||
|
arrayvec = { version = "0.7.4", features = ["serde"] }
|
||||||
ashpd = "0.9.1"
|
ashpd = "0.9.1"
|
||||||
async-compression = { version = "0.4", features = ["gzip", "futures-io"] }
|
async-compression = { version = "0.4", features = ["gzip", "futures-io"] }
|
||||||
async-dispatcher = { version = "0.1" }
|
async-dispatcher = { version = "0.1" }
|
||||||
@@ -298,6 +299,7 @@ bitflags = "2.6.0"
|
|||||||
blade-graphics = { git = "https://github.com/zed-industries/blade", rev = "a477c2008db27db0b9f745715e119b3ee7ab7818" }
|
blade-graphics = { git = "https://github.com/zed-industries/blade", rev = "a477c2008db27db0b9f745715e119b3ee7ab7818" }
|
||||||
blade-macros = { git = "https://github.com/zed-industries/blade", rev = "a477c2008db27db0b9f745715e119b3ee7ab7818" }
|
blade-macros = { git = "https://github.com/zed-industries/blade", rev = "a477c2008db27db0b9f745715e119b3ee7ab7818" }
|
||||||
blade-util = { git = "https://github.com/zed-industries/blade", rev = "a477c2008db27db0b9f745715e119b3ee7ab7818" }
|
blade-util = { git = "https://github.com/zed-industries/blade", rev = "a477c2008db27db0b9f745715e119b3ee7ab7818" }
|
||||||
|
blake3 = "1.5.3"
|
||||||
cap-std = "3.0"
|
cap-std = "3.0"
|
||||||
cargo_toml = "0.20"
|
cargo_toml = "0.20"
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
|
|||||||
@@ -131,6 +131,26 @@ impl CompletionProvider {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stream_completion_bg(
|
||||||
|
// TODO consider unifying this with stream_completion, since they share so much code
|
||||||
|
// (design question: do we want to make a Spawn or perhaps Executor trait to abstract over bg and fg executors?)
|
||||||
|
&self,
|
||||||
|
request: LanguageModelRequest,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> Task<Result<CompletionResponse>> {
|
||||||
|
let rate_limiter = self.request_limiter.clone();
|
||||||
|
let provider = self.provider.clone();
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
let lock = rate_limiter.acquire_arc().await;
|
||||||
|
let response = provider.read().stream_completion(request);
|
||||||
|
let response = response.await?;
|
||||||
|
Ok(CompletionResponse {
|
||||||
|
inner: response,
|
||||||
|
_lock: lock,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn complete(&self, request: LanguageModelRequest, cx: &AppContext) -> Task<Result<String>> {
|
pub fn complete(&self, request: LanguageModelRequest, cx: &AppContext) -> Task<Result<String>> {
|
||||||
let response = self.stream_completion(request, cx);
|
let response = self.stream_completion(request, cx);
|
||||||
cx.foreground_executor().spawn(async move {
|
cx.foreground_executor().spawn(async move {
|
||||||
|
|||||||
@@ -19,6 +19,8 @@ crate-type = ["bin"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
arrayvec.workspace = true
|
||||||
|
blake3.workspace = true
|
||||||
client.workspace = true
|
client.workspace = true
|
||||||
clock.workspace = true
|
clock.workspace = true
|
||||||
collections.workspace = true
|
collections.workspace = true
|
||||||
@@ -28,6 +30,7 @@ futures.workspace = true
|
|||||||
futures-batch.workspace = true
|
futures-batch.workspace = true
|
||||||
gpui.workspace = true
|
gpui.workspace = true
|
||||||
language.workspace = true
|
language.workspace = true
|
||||||
|
language_model.workspace = true
|
||||||
log.workspace = true
|
log.workspace = true
|
||||||
heed.workspace = true
|
heed.workspace = true
|
||||||
http.workspace = true
|
http.workspace = true
|
||||||
|
|||||||
@@ -12,6 +12,12 @@ use futures::{future::BoxFuture, FutureExt};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{fmt, future};
|
use std::{fmt, future};
|
||||||
|
|
||||||
|
/// Trait for embedding providers. Texts in, vectors out.
|
||||||
|
pub trait EmbeddingProvider: Sync + Send {
|
||||||
|
fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>>;
|
||||||
|
fn batch_size(&self) -> usize;
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
pub struct Embedding(Vec<f32>);
|
pub struct Embedding(Vec<f32>);
|
||||||
|
|
||||||
@@ -68,12 +74,6 @@ impl fmt::Display for Embedding {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait for embedding providers. Texts in, vectors out.
|
|
||||||
pub trait EmbeddingProvider: Sync + Send {
|
|
||||||
fn embed<'a>(&'a self, texts: &'a [TextToEmbed<'a>]) -> BoxFuture<'a, Result<Vec<Embedding>>>;
|
|
||||||
fn batch_size(&self) -> usize;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TextToEmbed<'a> {
|
pub struct TextToEmbed<'a> {
|
||||||
pub text: &'a str,
|
pub text: &'a str,
|
||||||
|
|||||||
470
crates/semantic_index/src/embedding_index.rs
Normal file
470
crates/semantic_index/src/embedding_index.rs
Normal file
@@ -0,0 +1,470 @@
|
|||||||
|
use crate::{
|
||||||
|
chunking::{self, Chunk},
|
||||||
|
embedding::{Embedding, EmbeddingProvider, TextToEmbed},
|
||||||
|
indexing::{IndexingEntryHandle, IndexingEntrySet},
|
||||||
|
};
|
||||||
|
use anyhow::{anyhow, Context as _, Result};
|
||||||
|
use collections::Bound;
|
||||||
|
use fs::Fs;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
use futures_batch::ChunksTimeoutStreamExt;
|
||||||
|
use gpui::{AppContext, Model, Task};
|
||||||
|
use heed::types::{SerdeBincode, Str};
|
||||||
|
use language::LanguageRegistry;
|
||||||
|
use log;
|
||||||
|
use project::{Entry, UpdatedEntriesSet, Worktree};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use smol::channel;
|
||||||
|
use std::{
|
||||||
|
cmp::Ordering,
|
||||||
|
future::Future,
|
||||||
|
iter,
|
||||||
|
path::Path,
|
||||||
|
sync::Arc,
|
||||||
|
time::{Duration, SystemTime},
|
||||||
|
};
|
||||||
|
use util::ResultExt;
|
||||||
|
use worktree::Snapshot;
|
||||||
|
|
||||||
|
pub struct EmbeddingIndex {
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
db: heed::Database<Str, SerdeBincode<EmbeddedFile>>,
|
||||||
|
fs: Arc<dyn Fs>,
|
||||||
|
language_registry: Arc<LanguageRegistry>,
|
||||||
|
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||||
|
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EmbeddingIndex {
|
||||||
|
pub fn new(
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
fs: Arc<dyn Fs>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
embedding_db: heed::Database<Str, SerdeBincode<EmbeddedFile>>,
|
||||||
|
language_registry: Arc<LanguageRegistry>,
|
||||||
|
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||||
|
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
worktree,
|
||||||
|
fs,
|
||||||
|
db_connection,
|
||||||
|
db: embedding_db,
|
||||||
|
language_registry,
|
||||||
|
embedding_provider,
|
||||||
|
entry_ids_being_indexed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn db(&self) -> &heed::Database<Str, SerdeBincode<EmbeddedFile>> {
|
||||||
|
&self.db
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_entries_changed_on_disk(
|
||||||
|
&self,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> impl Future<Output = Result<()>> {
|
||||||
|
let worktree = self.worktree.read(cx).snapshot();
|
||||||
|
let worktree_abs_path = worktree.abs_path().clone();
|
||||||
|
let scan = self.scan_entries(worktree, cx);
|
||||||
|
let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx);
|
||||||
|
let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx);
|
||||||
|
let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
|
||||||
|
async move {
|
||||||
|
futures::try_join!(scan.task, chunk.task, embed.task, persist)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_updated_entries(
|
||||||
|
&self,
|
||||||
|
updated_entries: UpdatedEntriesSet,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> impl Future<Output = Result<()>> {
|
||||||
|
let worktree = self.worktree.read(cx).snapshot();
|
||||||
|
let worktree_abs_path = worktree.abs_path().clone();
|
||||||
|
let scan = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
|
||||||
|
let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx);
|
||||||
|
let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx);
|
||||||
|
let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
|
||||||
|
async move {
|
||||||
|
futures::try_join!(scan.task, chunk.task, embed.task, persist)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_entries(&self, worktree: Snapshot, cx: &AppContext) -> ScanEntries {
|
||||||
|
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
|
||||||
|
let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
||||||
|
let db_connection = self.db_connection.clone();
|
||||||
|
let db = self.db;
|
||||||
|
let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||||
|
let task = cx.background_executor().spawn(async move {
|
||||||
|
let txn = db_connection
|
||||||
|
.read_txn()
|
||||||
|
.context("failed to create read transaction")?;
|
||||||
|
let mut db_entries = db
|
||||||
|
.iter(&txn)
|
||||||
|
.context("failed to create iterator")?
|
||||||
|
.move_between_keys()
|
||||||
|
.peekable();
|
||||||
|
|
||||||
|
let mut deletion_range: Option<(Bound<&str>, Bound<&str>)> = None;
|
||||||
|
for entry in worktree.files(false, 0) {
|
||||||
|
let entry_db_key = db_key_for_path(&entry.path);
|
||||||
|
|
||||||
|
let mut saved_mtime = None;
|
||||||
|
while let Some(db_entry) = db_entries.peek() {
|
||||||
|
match db_entry {
|
||||||
|
Ok((db_path, db_embedded_file)) => match (*db_path).cmp(&entry_db_key) {
|
||||||
|
Ordering::Less => {
|
||||||
|
if let Some(deletion_range) = deletion_range.as_mut() {
|
||||||
|
deletion_range.1 = Bound::Included(db_path);
|
||||||
|
} else {
|
||||||
|
deletion_range =
|
||||||
|
Some((Bound::Included(db_path), Bound::Included(db_path)));
|
||||||
|
}
|
||||||
|
|
||||||
|
db_entries.next();
|
||||||
|
}
|
||||||
|
Ordering::Equal => {
|
||||||
|
if let Some(deletion_range) = deletion_range.take() {
|
||||||
|
deleted_entry_ranges_tx
|
||||||
|
.send((
|
||||||
|
deletion_range.0.map(ToString::to_string),
|
||||||
|
deletion_range.1.map(ToString::to_string),
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
saved_mtime = db_embedded_file.mtime;
|
||||||
|
db_entries.next();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ordering::Greater => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => return Err(db_entries.next().unwrap().unwrap_err())?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.mtime != saved_mtime {
|
||||||
|
let handle = entries_being_indexed.insert(entry.id);
|
||||||
|
updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(db_entry) = db_entries.next() {
|
||||||
|
let (db_path, _) = db_entry?;
|
||||||
|
deleted_entry_ranges_tx
|
||||||
|
.send((Bound::Included(db_path.to_string()), Bound::Unbounded))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
ScanEntries {
|
||||||
|
updated_entries: updated_entries_rx,
|
||||||
|
deleted_entry_ranges: deleted_entry_ranges_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_updated_entries(
|
||||||
|
&self,
|
||||||
|
worktree: Snapshot,
|
||||||
|
updated_entries: UpdatedEntriesSet,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> ScanEntries {
|
||||||
|
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
|
||||||
|
let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
||||||
|
let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||||
|
let task = cx.background_executor().spawn(async move {
|
||||||
|
for (path, entry_id, status) in updated_entries.iter() {
|
||||||
|
match status {
|
||||||
|
project::PathChange::Added
|
||||||
|
| project::PathChange::Updated
|
||||||
|
| project::PathChange::AddedOrUpdated => {
|
||||||
|
if let Some(entry) = worktree.entry_for_id(*entry_id) {
|
||||||
|
if entry.is_file() {
|
||||||
|
let handle = entries_being_indexed.insert(entry.id);
|
||||||
|
updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
project::PathChange::Removed => {
|
||||||
|
let db_path = db_key_for_path(path);
|
||||||
|
deleted_entry_ranges_tx
|
||||||
|
.send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
project::PathChange::Loaded => {
|
||||||
|
// Do nothing.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
ScanEntries {
|
||||||
|
updated_entries: updated_entries_rx,
|
||||||
|
deleted_entry_ranges: deleted_entry_ranges_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn chunk_files(
|
||||||
|
&self,
|
||||||
|
worktree_abs_path: Arc<Path>,
|
||||||
|
entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> ChunkFiles {
|
||||||
|
let language_registry = self.language_registry.clone();
|
||||||
|
let fs = self.fs.clone();
|
||||||
|
let (chunked_files_tx, chunked_files_rx) = channel::bounded(2048);
|
||||||
|
let task = cx.spawn(|cx| async move {
|
||||||
|
cx.background_executor()
|
||||||
|
.scoped(|cx| {
|
||||||
|
for _ in 0..cx.num_cpus() {
|
||||||
|
cx.spawn(async {
|
||||||
|
while let Ok((entry, handle)) = entries.recv().await {
|
||||||
|
let entry_abs_path = worktree_abs_path.join(&entry.path);
|
||||||
|
let Some(text) = fs
|
||||||
|
.load(&entry_abs_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("failed to read path {entry_abs_path:?}")
|
||||||
|
})
|
||||||
|
.log_err()
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let language = language_registry
|
||||||
|
.language_for_file_path(&entry.path)
|
||||||
|
.await
|
||||||
|
.ok();
|
||||||
|
let chunked_file = ChunkedFile {
|
||||||
|
chunks: chunking::chunk_text(
|
||||||
|
&text,
|
||||||
|
language.as_ref(),
|
||||||
|
&entry.path,
|
||||||
|
),
|
||||||
|
handle,
|
||||||
|
path: entry.path,
|
||||||
|
mtime: entry.mtime,
|
||||||
|
text,
|
||||||
|
};
|
||||||
|
|
||||||
|
if chunked_files_tx.send(chunked_file).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
ChunkFiles {
|
||||||
|
files: chunked_files_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn embed_files(
|
||||||
|
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||||
|
chunked_files: channel::Receiver<ChunkedFile>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> EmbedFiles {
|
||||||
|
let embedding_provider = embedding_provider.clone();
|
||||||
|
let (embedded_files_tx, embedded_files_rx) = channel::bounded(512);
|
||||||
|
let task = cx.background_executor().spawn(async move {
|
||||||
|
let mut chunked_file_batches =
|
||||||
|
chunked_files.chunks_timeout(512, Duration::from_secs(2));
|
||||||
|
while let Some(chunked_files) = chunked_file_batches.next().await {
|
||||||
|
// View the batch of files as a vec of chunks
|
||||||
|
// Flatten out to a vec of chunks that we can subdivide into batch sized pieces
|
||||||
|
// Once those are done, reassemble them back into the files in which they belong
|
||||||
|
// If any embeddings fail for a file, the entire file is discarded
|
||||||
|
|
||||||
|
let chunks: Vec<TextToEmbed> = chunked_files
|
||||||
|
.iter()
|
||||||
|
.flat_map(|file| {
|
||||||
|
file.chunks.iter().map(|chunk| TextToEmbed {
|
||||||
|
text: &file.text[chunk.range.clone()],
|
||||||
|
digest: chunk.digest,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let mut embeddings: Vec<Option<Embedding>> = Vec::new();
|
||||||
|
for embedding_batch in chunks.chunks(embedding_provider.batch_size()) {
|
||||||
|
if let Some(batch_embeddings) =
|
||||||
|
embedding_provider.embed(embedding_batch).await.log_err()
|
||||||
|
{
|
||||||
|
if batch_embeddings.len() == embedding_batch.len() {
|
||||||
|
embeddings.extend(batch_embeddings.into_iter().map(Some));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
log::error!(
|
||||||
|
"embedding provider returned unexpected embedding count {}, expected {}",
|
||||||
|
batch_embeddings.len(), embedding_batch.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
embeddings.extend(iter::repeat(None).take(embedding_batch.len()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut embeddings = embeddings.into_iter();
|
||||||
|
for chunked_file in chunked_files {
|
||||||
|
let mut embedded_file = EmbeddedFile {
|
||||||
|
path: chunked_file.path,
|
||||||
|
mtime: chunked_file.mtime,
|
||||||
|
chunks: Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut embedded_all_chunks = true;
|
||||||
|
for (chunk, embedding) in
|
||||||
|
chunked_file.chunks.into_iter().zip(embeddings.by_ref())
|
||||||
|
{
|
||||||
|
if let Some(embedding) = embedding {
|
||||||
|
embedded_file
|
||||||
|
.chunks
|
||||||
|
.push(EmbeddedChunk { chunk, embedding });
|
||||||
|
} else {
|
||||||
|
embedded_all_chunks = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if embedded_all_chunks {
|
||||||
|
embedded_files_tx
|
||||||
|
.send((embedded_file, chunked_file.handle))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
EmbedFiles {
|
||||||
|
files: embedded_files_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn persist_embeddings(
|
||||||
|
&self,
|
||||||
|
mut deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
|
||||||
|
embedded_files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> Task<Result<()>> {
|
||||||
|
let db_connection = self.db_connection.clone();
|
||||||
|
let db = self.db;
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
while let Some(deletion_range) = deleted_entry_ranges.next().await {
|
||||||
|
let mut txn = db_connection.write_txn()?;
|
||||||
|
let start = deletion_range.0.as_ref().map(|start| start.as_str());
|
||||||
|
let end = deletion_range.1.as_ref().map(|end| end.as_str());
|
||||||
|
log::debug!("deleting embeddings in range {:?}", &(start, end));
|
||||||
|
db.delete_range(&mut txn, &(start, end))?;
|
||||||
|
txn.commit()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut embedded_files = embedded_files.chunks_timeout(4096, Duration::from_secs(2));
|
||||||
|
while let Some(embedded_files) = embedded_files.next().await {
|
||||||
|
let mut txn = db_connection.write_txn()?;
|
||||||
|
for (file, _) in &embedded_files {
|
||||||
|
log::debug!("saving embedding for file {:?}", file.path);
|
||||||
|
let key = db_key_for_path(&file.path);
|
||||||
|
db.put(&mut txn, &key, file)?;
|
||||||
|
}
|
||||||
|
txn.commit()?;
|
||||||
|
|
||||||
|
drop(embedded_files);
|
||||||
|
log::debug!("committed");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn paths(&self, cx: &AppContext) -> Task<Result<Vec<Arc<Path>>>> {
|
||||||
|
let connection = self.db_connection.clone();
|
||||||
|
let db = self.db;
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
let tx = connection
|
||||||
|
.read_txn()
|
||||||
|
.context("failed to create read transaction")?;
|
||||||
|
let result = db
|
||||||
|
.iter(&tx)?
|
||||||
|
.map(|entry| Ok(entry?.1.path.clone()))
|
||||||
|
.collect::<Result<Vec<Arc<Path>>>>();
|
||||||
|
drop(tx);
|
||||||
|
result
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn chunks_for_path(
|
||||||
|
&self,
|
||||||
|
path: Arc<Path>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> Task<Result<Vec<EmbeddedChunk>>> {
|
||||||
|
let connection = self.db_connection.clone();
|
||||||
|
let db = self.db;
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
let tx = connection
|
||||||
|
.read_txn()
|
||||||
|
.context("failed to create read transaction")?;
|
||||||
|
Ok(db
|
||||||
|
.get(&tx, &db_key_for_path(&path))?
|
||||||
|
.ok_or_else(|| anyhow!("no such path"))?
|
||||||
|
.chunks
|
||||||
|
.clone())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ScanEntries {
|
||||||
|
updated_entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
|
||||||
|
deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
|
||||||
|
task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ChunkFiles {
|
||||||
|
files: channel::Receiver<ChunkedFile>,
|
||||||
|
task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ChunkedFile {
|
||||||
|
pub path: Arc<Path>,
|
||||||
|
pub mtime: Option<SystemTime>,
|
||||||
|
pub handle: IndexingEntryHandle,
|
||||||
|
pub text: String,
|
||||||
|
pub chunks: Vec<Chunk>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EmbedFiles {
|
||||||
|
pub files: channel::Receiver<(EmbeddedFile, IndexingEntryHandle)>,
|
||||||
|
pub task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct EmbeddedFile {
|
||||||
|
pub path: Arc<Path>,
|
||||||
|
pub mtime: Option<SystemTime>,
|
||||||
|
pub chunks: Vec<EmbeddedChunk>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct EmbeddedChunk {
|
||||||
|
pub chunk: Chunk,
|
||||||
|
pub embedding: Embedding,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn db_key_for_path(path: &Arc<Path>) -> String {
|
||||||
|
path.to_string_lossy().replace('/', "\0")
|
||||||
|
}
|
||||||
49
crates/semantic_index/src/indexing.rs
Normal file
49
crates/semantic_index/src/indexing.rs
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
use collections::HashSet;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use project::ProjectEntryId;
|
||||||
|
use smol::channel;
|
||||||
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
|
/// The set of entries that are currently being indexed.
|
||||||
|
pub struct IndexingEntrySet {
|
||||||
|
entry_ids: Mutex<HashSet<ProjectEntryId>>,
|
||||||
|
tx: channel::Sender<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When dropped, removes the entry from the set of entries that are being indexed.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct IndexingEntryHandle {
|
||||||
|
entry_id: ProjectEntryId,
|
||||||
|
set: Weak<IndexingEntrySet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexingEntrySet {
|
||||||
|
pub fn new(tx: channel::Sender<()>) -> Self {
|
||||||
|
Self {
|
||||||
|
entry_ids: Default::default(),
|
||||||
|
tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(self: &Arc<Self>, entry_id: ProjectEntryId) -> IndexingEntryHandle {
|
||||||
|
self.entry_ids.lock().insert(entry_id);
|
||||||
|
self.tx.send_blocking(()).ok();
|
||||||
|
IndexingEntryHandle {
|
||||||
|
entry_id,
|
||||||
|
set: Arc::downgrade(self),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.entry_ids.lock().len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for IndexingEntryHandle {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(set) = self.set.upgrade() {
|
||||||
|
set.tx.send_blocking(()).ok();
|
||||||
|
set.entry_ids.lock().remove(&self.entry_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
387
crates/semantic_index/src/project_index.rs
Normal file
387
crates/semantic_index/src/project_index.rs
Normal file
@@ -0,0 +1,387 @@
|
|||||||
|
use crate::{
|
||||||
|
embedding::{EmbeddingProvider, TextToEmbed},
|
||||||
|
worktree_index::{WorktreeIndex, WorktreeIndexHandle},
|
||||||
|
};
|
||||||
|
use anyhow::{anyhow, Context, Result};
|
||||||
|
use collections::HashMap;
|
||||||
|
use fs::Fs;
|
||||||
|
use futures::{stream::StreamExt, FutureExt};
|
||||||
|
use gpui::{
|
||||||
|
AppContext, Entity, EntityId, EventEmitter, Model, ModelContext, Subscription, Task, WeakModel,
|
||||||
|
};
|
||||||
|
use language::LanguageRegistry;
|
||||||
|
use log;
|
||||||
|
use project::{Project, Worktree, WorktreeId};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use smol::channel;
|
||||||
|
use std::{cmp::Ordering, num::NonZeroUsize, ops::Range, path::Path, sync::Arc};
|
||||||
|
use util::ResultExt;
|
||||||
|
|
||||||
|
pub struct SearchResult {
|
||||||
|
pub worktree: Model<Worktree>,
|
||||||
|
pub path: Arc<Path>,
|
||||||
|
pub range: Range<usize>,
|
||||||
|
pub score: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WorktreeSearchResult {
|
||||||
|
pub worktree_id: WorktreeId,
|
||||||
|
pub path: Arc<Path>,
|
||||||
|
pub range: Range<usize>,
|
||||||
|
pub score: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum Status {
|
||||||
|
Idle,
|
||||||
|
Loading,
|
||||||
|
Scanning { remaining_count: NonZeroUsize },
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ProjectIndex {
|
||||||
|
db_connection: heed::Env,
|
||||||
|
project: WeakModel<Project>,
|
||||||
|
worktree_indices: HashMap<EntityId, WorktreeIndexHandle>,
|
||||||
|
language_registry: Arc<LanguageRegistry>,
|
||||||
|
fs: Arc<dyn Fs>,
|
||||||
|
last_status: Status,
|
||||||
|
status_tx: channel::Sender<()>,
|
||||||
|
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||||
|
_maintain_status: Task<()>,
|
||||||
|
_subscription: Subscription,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProjectIndex {
|
||||||
|
pub fn new(
|
||||||
|
project: Model<Project>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||||
|
cx: &mut ModelContext<Self>,
|
||||||
|
) -> Self {
|
||||||
|
let language_registry = project.read(cx).languages().clone();
|
||||||
|
let fs = project.read(cx).fs().clone();
|
||||||
|
let (status_tx, mut status_rx) = channel::unbounded();
|
||||||
|
let mut this = ProjectIndex {
|
||||||
|
db_connection,
|
||||||
|
project: project.downgrade(),
|
||||||
|
worktree_indices: HashMap::default(),
|
||||||
|
language_registry,
|
||||||
|
fs,
|
||||||
|
status_tx,
|
||||||
|
last_status: Status::Idle,
|
||||||
|
embedding_provider,
|
||||||
|
_subscription: cx.subscribe(&project, Self::handle_project_event),
|
||||||
|
_maintain_status: cx.spawn(|this, mut cx| async move {
|
||||||
|
while status_rx.next().await.is_some() {
|
||||||
|
if this
|
||||||
|
.update(&mut cx, |this, cx| this.update_status(cx))
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
this.update_worktree_indices(cx);
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn status(&self) -> Status {
|
||||||
|
self.last_status
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn project(&self) -> WeakModel<Project> {
|
||||||
|
self.project.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fs(&self) -> Arc<dyn Fs> {
|
||||||
|
self.fs.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_project_event(
|
||||||
|
&mut self,
|
||||||
|
_: Model<Project>,
|
||||||
|
event: &project::Event,
|
||||||
|
cx: &mut ModelContext<Self>,
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => {
|
||||||
|
self.update_worktree_indices(cx);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_worktree_indices(&mut self, cx: &mut ModelContext<Self>) {
|
||||||
|
let Some(project) = self.project.upgrade() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let worktrees = project
|
||||||
|
.read(cx)
|
||||||
|
.visible_worktrees(cx)
|
||||||
|
.filter_map(|worktree| {
|
||||||
|
if worktree.read(cx).is_local() {
|
||||||
|
Some((worktree.entity_id(), worktree))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
self.worktree_indices
|
||||||
|
.retain(|worktree_id, _| worktrees.contains_key(worktree_id));
|
||||||
|
for (worktree_id, worktree) in worktrees {
|
||||||
|
self.worktree_indices.entry(worktree_id).or_insert_with(|| {
|
||||||
|
let worktree_index = WorktreeIndex::load(
|
||||||
|
worktree.clone(),
|
||||||
|
self.db_connection.clone(),
|
||||||
|
self.language_registry.clone(),
|
||||||
|
self.fs.clone(),
|
||||||
|
self.status_tx.clone(),
|
||||||
|
self.embedding_provider.clone(),
|
||||||
|
cx,
|
||||||
|
);
|
||||||
|
|
||||||
|
let load_worktree = cx.spawn(|this, mut cx| async move {
|
||||||
|
let result = match worktree_index.await {
|
||||||
|
Ok(worktree_index) => {
|
||||||
|
this.update(&mut cx, |this, _| {
|
||||||
|
this.worktree_indices.insert(
|
||||||
|
worktree_id,
|
||||||
|
WorktreeIndexHandle::Loaded {
|
||||||
|
index: worktree_index.clone(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
})?;
|
||||||
|
Ok(worktree_index)
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
this.update(&mut cx, |this, _cx| {
|
||||||
|
this.worktree_indices.remove(&worktree_id)
|
||||||
|
})?;
|
||||||
|
Err(Arc::new(error))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
this.update(&mut cx, |this, cx| this.update_status(cx))?;
|
||||||
|
|
||||||
|
result
|
||||||
|
});
|
||||||
|
|
||||||
|
WorktreeIndexHandle::Loading {
|
||||||
|
index: load_worktree.shared(),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
self.update_status(cx);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_status(&mut self, cx: &mut ModelContext<Self>) {
|
||||||
|
let mut indexing_count = 0;
|
||||||
|
let mut any_loading = false;
|
||||||
|
|
||||||
|
for index in self.worktree_indices.values_mut() {
|
||||||
|
match index {
|
||||||
|
WorktreeIndexHandle::Loading { .. } => {
|
||||||
|
any_loading = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
WorktreeIndexHandle::Loaded { index, .. } => {
|
||||||
|
indexing_count += index.read(cx).entry_ids_being_indexed().len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let status = if any_loading {
|
||||||
|
Status::Loading
|
||||||
|
} else if let Some(remaining_count) = NonZeroUsize::new(indexing_count) {
|
||||||
|
Status::Scanning { remaining_count }
|
||||||
|
} else {
|
||||||
|
Status::Idle
|
||||||
|
};
|
||||||
|
|
||||||
|
if status != self.last_status {
|
||||||
|
self.last_status = status;
|
||||||
|
cx.emit(status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn search(
|
||||||
|
&self,
|
||||||
|
query: String,
|
||||||
|
limit: usize,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> Task<Result<Vec<SearchResult>>> {
|
||||||
|
let (chunks_tx, chunks_rx) = channel::bounded(1024);
|
||||||
|
let mut worktree_scan_tasks = Vec::new();
|
||||||
|
for worktree_index in self.worktree_indices.values() {
|
||||||
|
let worktree_index = worktree_index.clone();
|
||||||
|
let chunks_tx = chunks_tx.clone();
|
||||||
|
worktree_scan_tasks.push(cx.spawn(|cx| async move {
|
||||||
|
let index = match worktree_index {
|
||||||
|
WorktreeIndexHandle::Loading { index } => {
|
||||||
|
index.clone().await.map_err(|error| anyhow!(error))?
|
||||||
|
}
|
||||||
|
WorktreeIndexHandle::Loaded { index } => index.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
index
|
||||||
|
.read_with(&cx, |index, cx| {
|
||||||
|
let worktree_id = index.worktree().read(cx).id();
|
||||||
|
let db_connection = index.db_connection().clone();
|
||||||
|
let db = index.embedding_index().db().clone();
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
let txn = db_connection
|
||||||
|
.read_txn()
|
||||||
|
.context("failed to create read transaction")?;
|
||||||
|
let db_entries = db.iter(&txn).context("failed to iterate database")?;
|
||||||
|
for db_entry in db_entries {
|
||||||
|
let (_key, db_embedded_file) = db_entry?;
|
||||||
|
for chunk in db_embedded_file.chunks {
|
||||||
|
chunks_tx
|
||||||
|
.send((worktree_id, db_embedded_file.path.clone(), chunk))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
anyhow::Ok(())
|
||||||
|
})
|
||||||
|
})?
|
||||||
|
.await
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
drop(chunks_tx);
|
||||||
|
|
||||||
|
let project = self.project.clone();
|
||||||
|
let embedding_provider = self.embedding_provider.clone();
|
||||||
|
cx.spawn(|cx| async move {
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
let embedding_query_start = std::time::Instant::now();
|
||||||
|
log::info!("Searching for {query}");
|
||||||
|
|
||||||
|
let query_embeddings = embedding_provider
|
||||||
|
.embed(&[TextToEmbed::new(&query)])
|
||||||
|
.await?;
|
||||||
|
let query_embedding = query_embeddings
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| anyhow!("no embedding for query"))?;
|
||||||
|
|
||||||
|
let mut results_by_worker = Vec::new();
|
||||||
|
for _ in 0..cx.background_executor().num_cpus() {
|
||||||
|
results_by_worker.push(Vec::<WorktreeSearchResult>::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
let search_start = std::time::Instant::now();
|
||||||
|
|
||||||
|
cx.background_executor()
|
||||||
|
.scoped(|cx| {
|
||||||
|
for results in results_by_worker.iter_mut() {
|
||||||
|
cx.spawn(async {
|
||||||
|
while let Ok((worktree_id, path, chunk)) = chunks_rx.recv().await {
|
||||||
|
let score = chunk.embedding.similarity(&query_embedding);
|
||||||
|
let ix = match results.binary_search_by(|probe| {
|
||||||
|
score.partial_cmp(&probe.score).unwrap_or(Ordering::Equal)
|
||||||
|
}) {
|
||||||
|
Ok(ix) | Err(ix) => ix,
|
||||||
|
};
|
||||||
|
results.insert(
|
||||||
|
ix,
|
||||||
|
WorktreeSearchResult {
|
||||||
|
worktree_id,
|
||||||
|
path: path.clone(),
|
||||||
|
range: chunk.chunk.range.clone(),
|
||||||
|
score,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
results.truncate(limit);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
for scan_task in futures::future::join_all(worktree_scan_tasks).await {
|
||||||
|
scan_task.log_err();
|
||||||
|
}
|
||||||
|
|
||||||
|
project.read_with(&cx, |project, cx| {
|
||||||
|
let mut search_results = Vec::with_capacity(results_by_worker.len() * limit);
|
||||||
|
for worker_results in results_by_worker {
|
||||||
|
search_results.extend(worker_results.into_iter().filter_map(|result| {
|
||||||
|
Some(SearchResult {
|
||||||
|
worktree: project.worktree_for_id(result.worktree_id, cx)?,
|
||||||
|
path: result.path,
|
||||||
|
range: result.range,
|
||||||
|
score: result.score,
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
search_results.sort_unstable_by(|a, b| {
|
||||||
|
b.score.partial_cmp(&a.score).unwrap_or(Ordering::Equal)
|
||||||
|
});
|
||||||
|
search_results.truncate(limit);
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
{
|
||||||
|
let search_elapsed = search_start.elapsed();
|
||||||
|
log::debug!(
|
||||||
|
"searched {} entries in {:?}",
|
||||||
|
search_results.len(),
|
||||||
|
search_elapsed
|
||||||
|
);
|
||||||
|
let embedding_query_elapsed = embedding_query_start.elapsed();
|
||||||
|
log::debug!("embedding query took {:?}", embedding_query_elapsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
search_results
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn path_count(&self, cx: &AppContext) -> Result<u64> {
|
||||||
|
let mut result = 0;
|
||||||
|
for worktree_index in self.worktree_indices.values() {
|
||||||
|
if let WorktreeIndexHandle::Loaded { index, .. } = worktree_index {
|
||||||
|
result += index.read(cx).path_count()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn worktree_index(
|
||||||
|
&self,
|
||||||
|
worktree_id: WorktreeId,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> Option<Model<WorktreeIndex>> {
|
||||||
|
for index in self.worktree_indices.values() {
|
||||||
|
if let WorktreeIndexHandle::Loaded { index, .. } = index {
|
||||||
|
if index.read(cx).worktree().read(cx).id() == worktree_id {
|
||||||
|
return Some(index.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn worktree_indices(&self, cx: &AppContext) -> Vec<Model<WorktreeIndex>> {
|
||||||
|
let mut result = self
|
||||||
|
.worktree_indices
|
||||||
|
.values()
|
||||||
|
.filter_map(|index| {
|
||||||
|
if let WorktreeIndexHandle::Loaded { index, .. } = index {
|
||||||
|
Some(index.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
result.sort_by_key(|index| index.read(cx).worktree().read(cx).id());
|
||||||
|
result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventEmitter<Status> for ProjectIndex {}
|
||||||
@@ -55,8 +55,12 @@ impl ProjectIndexDebugView {
|
|||||||
for index in worktree_indices {
|
for index in worktree_indices {
|
||||||
let (root_path, worktree_id, worktree_paths) =
|
let (root_path, worktree_id, worktree_paths) =
|
||||||
index.read_with(&cx, |index, cx| {
|
index.read_with(&cx, |index, cx| {
|
||||||
let worktree = index.worktree.read(cx);
|
let worktree = index.worktree().read(cx);
|
||||||
(worktree.abs_path(), worktree.id(), index.paths(cx))
|
(
|
||||||
|
worktree.abs_path(),
|
||||||
|
worktree.id(),
|
||||||
|
index.embedding_index().paths(cx),
|
||||||
|
)
|
||||||
})?;
|
})?;
|
||||||
rows.push(Row::Worktree(root_path));
|
rows.push(Row::Worktree(root_path));
|
||||||
rows.extend(
|
rows.extend(
|
||||||
@@ -82,10 +86,12 @@ impl ProjectIndexDebugView {
|
|||||||
cx: &mut ViewContext<Self>,
|
cx: &mut ViewContext<Self>,
|
||||||
) -> Option<()> {
|
) -> Option<()> {
|
||||||
let project_index = self.index.read(cx);
|
let project_index = self.index.read(cx);
|
||||||
let fs = project_index.fs.clone();
|
let fs = project_index.fs().clone();
|
||||||
let worktree_index = project_index.worktree_index(worktree_id, cx)?.read(cx);
|
let worktree_index = project_index.worktree_index(worktree_id, cx)?.read(cx);
|
||||||
let root_path = worktree_index.worktree.read(cx).abs_path();
|
let root_path = worktree_index.worktree().read(cx).abs_path();
|
||||||
let chunks = worktree_index.chunks_for_path(file_path.clone(), cx);
|
let chunks = worktree_index
|
||||||
|
.embedding_index()
|
||||||
|
.chunks_for_path(file_path.clone(), cx);
|
||||||
|
|
||||||
cx.spawn(|this, mut cx| async move {
|
cx.spawn(|this, mut cx| async move {
|
||||||
let chunks = chunks.await?;
|
let chunks = chunks.await?;
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
457
crates/semantic_index/src/summary_index.rs
Normal file
457
crates/semantic_index/src/summary_index.rs
Normal file
@@ -0,0 +1,457 @@
|
|||||||
|
use anyhow::{anyhow, Context as _, Result};
|
||||||
|
use arrayvec::ArrayString;
|
||||||
|
use completion::CompletionProvider;
|
||||||
|
use fs::Fs;
|
||||||
|
use futures::{stream::StreamExt, TryFutureExt};
|
||||||
|
use futures_batch::ChunksTimeoutStreamExt;
|
||||||
|
use gpui::{AppContext, Model, Task};
|
||||||
|
use heed::types::{SerdeBincode, Str};
|
||||||
|
use language_model::{LanguageModel, LanguageModelRequest, LanguageModelRequestMessage, Role};
|
||||||
|
use log;
|
||||||
|
use project::{Entry, UpdatedEntriesSet, Worktree};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use smol::channel;
|
||||||
|
use std::{
|
||||||
|
future::Future,
|
||||||
|
path::Path,
|
||||||
|
sync::Arc,
|
||||||
|
time::{Duration, Instant, SystemTime},
|
||||||
|
};
|
||||||
|
use util::ResultExt;
|
||||||
|
use worktree::Snapshot;
|
||||||
|
|
||||||
|
use crate::indexing::{IndexingEntryHandle, IndexingEntrySet};
|
||||||
|
|
||||||
|
/// This model should be good for summarizing code - fast, low price, and good at outputting English.
|
||||||
|
///
|
||||||
|
/// It's called "preferred" because if the model isn't available (e.g. due to lacking the necessary API key),
|
||||||
|
/// we fall back on the global CompletionProvider's selected model.
|
||||||
|
const PREFERRED_SUMMARIZATION_MODEL: LanguageModel =
|
||||||
|
LanguageModel::OpenAi(open_ai::Model::FourOmniMini);
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
struct UnsummarizedFile {
|
||||||
|
// BLAKE3 hash of the source file's contents
|
||||||
|
content_hash: String,
|
||||||
|
// The source file's contents
|
||||||
|
contents: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
struct SummarizedFile {
|
||||||
|
// BLAKE3 hash of the source file's contents
|
||||||
|
content_hash: String,
|
||||||
|
// The LLM's summary of the file's contents
|
||||||
|
summary: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This is what blake3's to_hex() method returns - see https://docs.rs/blake3/1.5.3/src/blake3/lib.rs.html#246
|
||||||
|
type Blake3Digest = ArrayString<{ blake3::OUT_LEN * 2 }>;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct FileDigest {
|
||||||
|
path: Arc<Path>,
|
||||||
|
mtime: Option<SystemTime>,
|
||||||
|
digest: Blake3Digest,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SummarizeFiles {
|
||||||
|
files: channel::Receiver<SummarizedFile>,
|
||||||
|
task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct NeedsSummary {
|
||||||
|
files: channel::Receiver<UnsummarizedFile>,
|
||||||
|
task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SummaryIndex {
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
fs: Arc<dyn Fs>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>, // Key: file path. Val: BLAKE3 digest of its contents.
|
||||||
|
summary_db: heed::Database<Str, Str>, // Key: BLAKE3 digest of a file's contents. Val: LLM summary of those contents.
|
||||||
|
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ScanEntries {
|
||||||
|
updated_entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
|
||||||
|
task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MightNeedSummaryFiles {
|
||||||
|
files: channel::Receiver<UnsummarizedFile>,
|
||||||
|
task: Task<Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SummaryIndex {
|
||||||
|
pub fn new(
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
fs: Arc<dyn Fs>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
file_digest_db: heed::Database<Str, SerdeBincode<FileDigest>>,
|
||||||
|
summary_db: heed::Database<Str, Str>,
|
||||||
|
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
worktree,
|
||||||
|
fs,
|
||||||
|
db_connection,
|
||||||
|
file_digest_db,
|
||||||
|
summary_db,
|
||||||
|
entry_ids_being_indexed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_entries_changed_on_disk(
|
||||||
|
&self,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> impl Future<Output = Result<()>> {
|
||||||
|
let start = Instant::now();
|
||||||
|
let worktree = self.worktree.read(cx).snapshot();
|
||||||
|
let worktree_abs_path = worktree.abs_path().clone();
|
||||||
|
let scan = self.scan_entries(worktree, cx);
|
||||||
|
let digest = self.digest_files(worktree_abs_path, scan.updated_entries, cx);
|
||||||
|
let needs_summary = self.check_summary_cache(digest.files, cx);
|
||||||
|
let summaries = self.summarize_files(needs_summary.files, cx);
|
||||||
|
let persist = self.persist_summaries(summaries.files, cx);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
futures::try_join!(
|
||||||
|
scan.task,
|
||||||
|
digest.task,
|
||||||
|
needs_summary.task,
|
||||||
|
summaries.task,
|
||||||
|
persist
|
||||||
|
)?;
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"Summarizing everything that changed on disk took {:?}",
|
||||||
|
start.elapsed()
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index_updated_entries(
|
||||||
|
&self,
|
||||||
|
updated_entries: UpdatedEntriesSet,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> impl Future<Output = Result<()>> {
|
||||||
|
let start = Instant::now();
|
||||||
|
let worktree = self.worktree.read(cx).snapshot();
|
||||||
|
let worktree_abs_path = worktree.abs_path().clone();
|
||||||
|
let scan = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
|
||||||
|
let digest = self.digest_files(worktree_abs_path, scan.updated_entries, cx);
|
||||||
|
let needs_summary = self.check_summary_cache(digest.files, cx);
|
||||||
|
let summaries = self.summarize_files(needs_summary.files, cx);
|
||||||
|
let persist = self.persist_summaries(summaries.files, cx);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
futures::try_join!(
|
||||||
|
scan.task,
|
||||||
|
digest.task,
|
||||||
|
needs_summary.task,
|
||||||
|
summaries.task,
|
||||||
|
persist
|
||||||
|
)?;
|
||||||
|
|
||||||
|
log::info!("Summarizing updated entries took {:?}", start.elapsed());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_summary_cache(
|
||||||
|
&self,
|
||||||
|
mut might_need_summary: channel::Receiver<UnsummarizedFile>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> NeedsSummary {
|
||||||
|
let db_connection = self.db_connection.clone();
|
||||||
|
let db = self.summary_db;
|
||||||
|
let (needs_summary_tx, needs_summary_rx) = channel::bounded(512);
|
||||||
|
let task = cx.background_executor().spawn(async move {
|
||||||
|
while let Some(file) = might_need_summary.next().await {
|
||||||
|
let tx = db_connection
|
||||||
|
.read_txn()
|
||||||
|
.context("Failed to create read transaction for checking which hashes are in summary cache")?;
|
||||||
|
|
||||||
|
match db.get(&tx, &file.content_hash) {
|
||||||
|
Ok(opt_answer) => {
|
||||||
|
if opt_answer.is_none() {
|
||||||
|
// It's not in the summary cache db, so we need to summarize it.
|
||||||
|
log::debug!("{:?} was NOT in the db cache and needs to be resummarized.", &file.content_hash);
|
||||||
|
needs_summary_tx.send(file).await?;
|
||||||
|
} else {
|
||||||
|
log::debug!("{:?} was in the db cache and does not need to be resummarized.", &file.content_hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::error!("Reading from the summaries database failed: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
NeedsSummary {
|
||||||
|
files: needs_summary_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_entries(&self, worktree: Snapshot, cx: &AppContext) -> ScanEntries {
|
||||||
|
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
|
||||||
|
let db_connection = self.db_connection.clone();
|
||||||
|
let digest_db = self.file_digest_db;
|
||||||
|
let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||||
|
let task = cx.background_executor().spawn(async move {
|
||||||
|
let txn = db_connection
|
||||||
|
.read_txn()
|
||||||
|
.context("failed to create read transaction")?;
|
||||||
|
// let mut db_entries = digest_db
|
||||||
|
// .iter(&txn)
|
||||||
|
// .context("failed to create iterator")?
|
||||||
|
// .move_between_keys()
|
||||||
|
// .peekable();
|
||||||
|
|
||||||
|
for entry in worktree.files(false, 0) {
|
||||||
|
let entry_db_key = db_key_for_path(&entry.path);
|
||||||
|
|
||||||
|
match digest_db.get(&txn, &entry_db_key) {
|
||||||
|
Ok(opt_saved_digest) => {
|
||||||
|
// The file path is the same, but the mtime is different. Update it!
|
||||||
|
if entry.mtime != opt_saved_digest.and_then(|digest| digest.mtime) {
|
||||||
|
let handle = entries_being_indexed.insert(entry.id);
|
||||||
|
updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::error!(
|
||||||
|
"Error trying to get file digest db entry {:?}: {:?}",
|
||||||
|
&entry_db_key,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO delete db entries for deleted files
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
ScanEntries {
|
||||||
|
updated_entries: updated_entries_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_updated_entries(
|
||||||
|
&self,
|
||||||
|
worktree: Snapshot,
|
||||||
|
updated_entries: UpdatedEntriesSet,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> ScanEntries {
|
||||||
|
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
|
||||||
|
// let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
||||||
|
let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||||
|
let task = cx.background_executor().spawn(async move {
|
||||||
|
for (path, entry_id, status) in updated_entries.iter() {
|
||||||
|
match status {
|
||||||
|
project::PathChange::Added
|
||||||
|
| project::PathChange::Updated
|
||||||
|
| project::PathChange::AddedOrUpdated => {
|
||||||
|
if let Some(entry) = worktree.entry_for_id(*entry_id) {
|
||||||
|
if entry.is_file() {
|
||||||
|
let handle = entries_being_indexed.insert(entry.id);
|
||||||
|
updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
project::PathChange::Removed => {
|
||||||
|
let _db_path = db_key_for_path(path);
|
||||||
|
// TODO delete db entries for deleted files
|
||||||
|
// deleted_entry_ranges_tx
|
||||||
|
// .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
|
||||||
|
// .await?;
|
||||||
|
}
|
||||||
|
project::PathChange::Loaded => {
|
||||||
|
// Do nothing.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
ScanEntries {
|
||||||
|
updated_entries: updated_entries_rx,
|
||||||
|
// deleted_entry_ranges: deleted_entry_ranges_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn digest_files(
|
||||||
|
&self,
|
||||||
|
worktree_abs_path: Arc<Path>,
|
||||||
|
entries: channel::Receiver<(Entry, IndexingEntryHandle)>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> MightNeedSummaryFiles {
|
||||||
|
let fs = self.fs.clone();
|
||||||
|
let (might_need_summary_tx, might_need_summary_rx) = channel::bounded(2048);
|
||||||
|
let task = cx.spawn(|cx| async move {
|
||||||
|
cx.background_executor()
|
||||||
|
.scoped(|cx| {
|
||||||
|
for _ in 0..cx.num_cpus() {
|
||||||
|
cx.spawn(async {
|
||||||
|
while let Ok((entry, handle)) = entries.recv().await {
|
||||||
|
let entry_abs_path = worktree_abs_path.join(&entry.path);
|
||||||
|
let Some(text) = fs
|
||||||
|
.load(&entry_abs_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("failed to read path {entry_abs_path:?}")
|
||||||
|
})
|
||||||
|
.log_err()
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let content_hash = {
|
||||||
|
let mut hasher = blake3::Hasher::new();
|
||||||
|
|
||||||
|
hasher.update(text.as_bytes());
|
||||||
|
|
||||||
|
hasher.finalize().to_hex().to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
let unsummarized_file = UnsummarizedFile {
|
||||||
|
content_hash,
|
||||||
|
contents: text,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = might_need_summary_tx
|
||||||
|
.send(unsummarized_file)
|
||||||
|
.map_err(|error| anyhow!(error))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
log::error!("Error: {:?}", err);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
MightNeedSummaryFiles {
|
||||||
|
files: might_need_summary_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn summarize_files(
|
||||||
|
&self,
|
||||||
|
mut unsummarized_files: channel::Receiver<UnsummarizedFile>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> SummarizeFiles {
|
||||||
|
let (summarized_tx, summarized_rx) = channel::bounded(512);
|
||||||
|
let task = cx.spawn(|cx| async move {
|
||||||
|
while let Some(file) = unsummarized_files.next().await {
|
||||||
|
log::debug!("Summarizing {:?}", file);
|
||||||
|
let summary = cx.update(|cx| Self::summarize_code(&file.contents, cx))?;
|
||||||
|
|
||||||
|
summarized_tx
|
||||||
|
.send(SummarizedFile {
|
||||||
|
content_hash: file.content_hash,
|
||||||
|
summary: summary.await?,
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
SummarizeFiles {
|
||||||
|
files: summarized_rx,
|
||||||
|
task,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn summarize_code(code: &str, cx: &AppContext) -> impl Future<Output = Result<String>> {
|
||||||
|
let start = Instant::now();
|
||||||
|
let provider = CompletionProvider::global(cx);
|
||||||
|
let model = PREFERRED_SUMMARIZATION_MODEL;
|
||||||
|
const PROMPT_BEFORE_CODE: &str = "Summarize this code in 3 sentences, using no newlines or bullet points in the summary:";
|
||||||
|
let prompt = format!("{PROMPT_BEFORE_CODE}\n{code}");
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
"Summarizing code by sending this prompt to {:?}: {:?}",
|
||||||
|
&model,
|
||||||
|
&prompt
|
||||||
|
);
|
||||||
|
|
||||||
|
let request = LanguageModelRequest {
|
||||||
|
model,
|
||||||
|
messages: vec![LanguageModelRequestMessage {
|
||||||
|
role: Role::User,
|
||||||
|
content: prompt,
|
||||||
|
}],
|
||||||
|
stop: Vec::new(),
|
||||||
|
temperature: 1.0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = provider.stream_completion_bg(request, cx);
|
||||||
|
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
let mut chunks = response.await?;
|
||||||
|
let mut answer = String::new();
|
||||||
|
|
||||||
|
while let Some(chunk) = chunks.next().await {
|
||||||
|
answer.push_str(chunk?.as_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
log::info!("Code summarization took {:?}", start.elapsed());
|
||||||
|
Ok(answer)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn persist_summaries(
|
||||||
|
&self,
|
||||||
|
summaries: channel::Receiver<SummarizedFile>,
|
||||||
|
cx: &AppContext,
|
||||||
|
) -> Task<Result<()>> {
|
||||||
|
let db_connection = self.db_connection.clone();
|
||||||
|
let db = self.summary_db;
|
||||||
|
cx.background_executor().spawn(async move {
|
||||||
|
let mut summaries = summaries.chunks_timeout(4096, Duration::from_secs(2));
|
||||||
|
while let Some(summaries) = summaries.next().await {
|
||||||
|
let mut txn = db_connection.write_txn()?;
|
||||||
|
for file in &summaries {
|
||||||
|
log::debug!(
|
||||||
|
"Saving {} bytes of summary for content hash {:?}",
|
||||||
|
file.summary.len(),
|
||||||
|
file.content_hash
|
||||||
|
);
|
||||||
|
db.put(&mut txn, &file.content_hash, &file.summary)?;
|
||||||
|
}
|
||||||
|
txn.commit()?;
|
||||||
|
|
||||||
|
drop(summaries);
|
||||||
|
log::debug!("committed summaries");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn db_key_for_path(path: &Arc<Path>) -> String {
|
||||||
|
path.to_string_lossy().replace('/', "\0")
|
||||||
|
}
|
||||||
279
crates/semantic_index/src/worktree_index.rs
Normal file
279
crates/semantic_index/src/worktree_index.rs
Normal file
@@ -0,0 +1,279 @@
|
|||||||
|
use crate::embedding::EmbeddingProvider;
|
||||||
|
use crate::embedding_index::EmbeddingIndex;
|
||||||
|
use crate::indexing::IndexingEntrySet;
|
||||||
|
use crate::summary_index::SummaryIndex;
|
||||||
|
use anyhow::Result;
|
||||||
|
use fs::Fs;
|
||||||
|
use futures::future::Shared;
|
||||||
|
use gpui::{
|
||||||
|
AppContext, AsyncAppContext, Context, Model, ModelContext, Subscription, Task, WeakModel,
|
||||||
|
};
|
||||||
|
use language::LanguageRegistry;
|
||||||
|
use log;
|
||||||
|
use project::{UpdatedEntriesSet, Worktree};
|
||||||
|
use smol::channel;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use util::ResultExt;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum WorktreeIndexHandle {
|
||||||
|
Loading {
|
||||||
|
index: Shared<Task<Result<Model<WorktreeIndex>, Arc<anyhow::Error>>>>,
|
||||||
|
},
|
||||||
|
Loaded {
|
||||||
|
index: Model<WorktreeIndex>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct WorktreeIndex {
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
embedding_index: EmbeddingIndex,
|
||||||
|
summary_index: SummaryIndex,
|
||||||
|
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||||
|
_index_entries: Task<Result<()>>,
|
||||||
|
_subscription: Subscription,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorktreeIndex {
|
||||||
|
pub fn load(
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
language_registry: Arc<LanguageRegistry>,
|
||||||
|
fs: Arc<dyn Fs>,
|
||||||
|
status_tx: channel::Sender<()>,
|
||||||
|
embedding_provider: Arc<dyn EmbeddingProvider>,
|
||||||
|
cx: &mut AppContext,
|
||||||
|
) -> Task<Result<Model<Self>>> {
|
||||||
|
let worktree_for_index = worktree.clone();
|
||||||
|
let worktree_for_summary = worktree.clone();
|
||||||
|
let worktree_abs_path = worktree.read(cx).abs_path();
|
||||||
|
let embedding_fs = Arc::clone(&fs);
|
||||||
|
let summary_fs = fs;
|
||||||
|
cx.spawn(|mut cx| async move {
|
||||||
|
let entries_being_indexed = Arc::new(IndexingEntrySet::new(status_tx));
|
||||||
|
let (embedding_index, summary_index) = cx
|
||||||
|
.background_executor()
|
||||||
|
.spawn({
|
||||||
|
let entries_being_indexed = Arc::clone(&entries_being_indexed);
|
||||||
|
let db_connection = db_connection.clone();
|
||||||
|
async move {
|
||||||
|
let mut txn = db_connection.write_txn()?;
|
||||||
|
let embedding_index = {
|
||||||
|
let db_name = worktree_abs_path.to_string_lossy();
|
||||||
|
let db = db_connection.create_database(&mut txn, Some(&db_name))?;
|
||||||
|
|
||||||
|
EmbeddingIndex::new(
|
||||||
|
worktree_for_index,
|
||||||
|
embedding_fs,
|
||||||
|
db_connection.clone(),
|
||||||
|
db,
|
||||||
|
language_registry,
|
||||||
|
embedding_provider,
|
||||||
|
Arc::clone(&entries_being_indexed),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
let summary_index = {
|
||||||
|
let file_digest_db = {
|
||||||
|
let db_name =
|
||||||
|
// Prepend something that wouldn't be found at the beginning of an
|
||||||
|
// absolute path, so we don't get db key namespace conflicts with
|
||||||
|
// embeddings, which use the abs path as a key.
|
||||||
|
format!("digests-{}", worktree_abs_path.to_string_lossy());
|
||||||
|
db_connection.create_database(&mut txn, Some(&db_name))?
|
||||||
|
};
|
||||||
|
let summary_db = {
|
||||||
|
let db_name =
|
||||||
|
// Prepend something that wouldn't be found at the beginning of an
|
||||||
|
// absolute path, so we don't get db key namespace conflicts with
|
||||||
|
// embeddings, which use the abs path as a key.
|
||||||
|
format!("summaries-{}", worktree_abs_path.to_string_lossy());
|
||||||
|
db_connection.create_database(&mut txn, Some(&db_name))?
|
||||||
|
};
|
||||||
|
SummaryIndex::new(
|
||||||
|
worktree_for_summary,
|
||||||
|
summary_fs,
|
||||||
|
db_connection.clone(),
|
||||||
|
file_digest_db,
|
||||||
|
summary_db,
|
||||||
|
Arc::clone(&entries_being_indexed),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
txn.commit()?;
|
||||||
|
anyhow::Ok((embedding_index, summary_index))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
cx.new_model(|cx| {
|
||||||
|
Self::new(
|
||||||
|
worktree,
|
||||||
|
db_connection,
|
||||||
|
embedding_index,
|
||||||
|
summary_index,
|
||||||
|
entries_being_indexed,
|
||||||
|
cx,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub fn new(
|
||||||
|
worktree: Model<Worktree>,
|
||||||
|
db_connection: heed::Env,
|
||||||
|
embedding_index: EmbeddingIndex,
|
||||||
|
summary_index: SummaryIndex,
|
||||||
|
entry_ids_being_indexed: Arc<IndexingEntrySet>,
|
||||||
|
cx: &mut ModelContext<Self>,
|
||||||
|
) -> Self {
|
||||||
|
let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
|
||||||
|
let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
|
||||||
|
if let worktree::Event::UpdatedEntries(update) = event {
|
||||||
|
dbg!(&update);
|
||||||
|
log::debug!("Updating entries...");
|
||||||
|
_ = updated_entries_tx.try_send(update.clone());
|
||||||
|
} else {
|
||||||
|
dbg!("non-update event");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Self {
|
||||||
|
db_connection,
|
||||||
|
embedding_index,
|
||||||
|
summary_index,
|
||||||
|
worktree,
|
||||||
|
entry_ids_being_indexed,
|
||||||
|
_index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)),
|
||||||
|
_subscription,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn entry_ids_being_indexed(&self) -> &IndexingEntrySet {
|
||||||
|
self.entry_ids_being_indexed.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn worktree(&self) -> &Model<Worktree> {
|
||||||
|
&self.worktree
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn db_connection(&self) -> &heed::Env {
|
||||||
|
&self.db_connection
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn embedding_index(&self) -> &EmbeddingIndex {
|
||||||
|
&self.embedding_index
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn index_entries(
|
||||||
|
this: WeakModel<Self>,
|
||||||
|
updated_entries: channel::Receiver<UpdatedEntriesSet>,
|
||||||
|
mut cx: AsyncAppContext,
|
||||||
|
) -> Result<()> {
|
||||||
|
let index = this.update(&mut cx, |this, cx| {
|
||||||
|
futures::future::try_join(
|
||||||
|
this.embedding_index.index_entries_changed_on_disk(cx),
|
||||||
|
this.summary_index.index_entries_changed_on_disk(cx),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
index.await.log_err();
|
||||||
|
|
||||||
|
while let Ok(updated_entries) = updated_entries.recv().await {
|
||||||
|
let index = this.update(&mut cx, |this, cx| {
|
||||||
|
futures::future::try_join(
|
||||||
|
this.embedding_index
|
||||||
|
.index_updated_entries(updated_entries.clone(), cx),
|
||||||
|
this.summary_index
|
||||||
|
.index_updated_entries(updated_entries, cx),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
index.await.log_err();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn path_count(&self) -> Result<u64> {
|
||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
let txn = self
|
||||||
|
.db_connection
|
||||||
|
.read_txn()
|
||||||
|
.context("failed to create read transaction")?;
|
||||||
|
Ok(self.embedding_index().db().len(&txn)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
// fn index_updated_entries(
|
||||||
|
// &self,
|
||||||
|
// updated_entries: UpdatedEntriesSet,
|
||||||
|
// cx: &AppContext,
|
||||||
|
// ) -> impl Future<Output = Result<()>> {
|
||||||
|
// log::debug!("index_updated_entries({:?})", &updated_entries);
|
||||||
|
// let worktree = self.worktree.read(cx).snapshot();
|
||||||
|
// let worktree_abs_path = worktree.abs_path().clone();
|
||||||
|
// let scan = self.scan_updated_entries(worktree, updated_entries.clone(), cx);
|
||||||
|
// let processed = self.process_files(worktree_abs_path, scan.updated_entries, cx);
|
||||||
|
// let embed = Self::embed_files(self.embedding_provider.clone(), processed.chunked_files, cx);
|
||||||
|
// let might_need_summary = self.check_summary_cache(processed.might_need_summary, cx);
|
||||||
|
// let summarized = self.summarize_files(might_need_summary.files, cx);
|
||||||
|
// let persist_summaries = self.persist_summaries(summarized.files, cx);
|
||||||
|
// let persist_embeds = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
|
||||||
|
// async move {
|
||||||
|
// futures::try_join!(
|
||||||
|
// scan.task,
|
||||||
|
// processed.task,
|
||||||
|
// embed.task,
|
||||||
|
// might_need_summary.task,
|
||||||
|
// summarized.task,
|
||||||
|
// persist_embeds,
|
||||||
|
// persist_summaries
|
||||||
|
// )?;
|
||||||
|
// Ok(())
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// fn scan_updated_entries(
|
||||||
|
// &self,
|
||||||
|
// worktree: Snapshot,
|
||||||
|
// updated_entries: UpdatedEntriesSet,
|
||||||
|
// cx: &AppContext,
|
||||||
|
// ) -> ScanEntries {
|
||||||
|
// let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
|
||||||
|
// let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
|
||||||
|
// let entries_being_indexed = self.entry_ids_being_indexed.clone();
|
||||||
|
// let task = cx.background_executor().spawn(async move {
|
||||||
|
// for (path, entry_id, status) in updated_entries.iter() {
|
||||||
|
// match status {
|
||||||
|
// project::PathChange::Added
|
||||||
|
// | project::PathChange::Updated
|
||||||
|
// | project::PathChange::AddedOrUpdated => {
|
||||||
|
// if let Some(entry) = worktree.entry_for_id(*entry_id) {
|
||||||
|
// if entry.is_file() {
|
||||||
|
// let handle = entries_being_indexed.insert(entry.id);
|
||||||
|
// updated_entries_tx.send((entry.clone(), handle)).await?;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// project::PathChange::Removed => {
|
||||||
|
// let db_path = db_key_for_path(path);
|
||||||
|
// deleted_entry_ranges_tx
|
||||||
|
// .send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
|
||||||
|
// .await?;
|
||||||
|
// }
|
||||||
|
// project::PathChange::Loaded => {
|
||||||
|
// // Do nothing.
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// Ok(())
|
||||||
|
// });
|
||||||
|
|
||||||
|
// ScanEntries {
|
||||||
|
// updated_entries: updated_entries_rx,
|
||||||
|
// deleted_entry_ranges: deleted_entry_ranges_rx,
|
||||||
|
// task,
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
}
|
||||||
@@ -106,6 +106,7 @@ vim.workspace = true
|
|||||||
welcome.workspace = true
|
welcome.workspace = true
|
||||||
workspace.workspace = true
|
workspace.workspace = true
|
||||||
zed_actions.workspace = true
|
zed_actions.workspace = true
|
||||||
|
blake3 = "1.5.3"
|
||||||
|
|
||||||
[target.'cfg(target_os = "windows")'.build-dependencies]
|
[target.'cfg(target_os = "windows")'.build-dependencies]
|
||||||
winresource = "0.1"
|
winresource = "0.1"
|
||||||
|
|||||||
Reference in New Issue
Block a user