diff --git a/Cargo.lock b/Cargo.lock index ffa2a2db44..b301a2c0bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9587,6 +9587,7 @@ name = "semantic_index" version = "0.1.0" dependencies = [ "anyhow", + "blake3", "client", "clock", "collections", @@ -10537,43 +10538,6 @@ dependencies = [ "rayon", ] -[[package]] -name = "summaries" -version = "0.1.0" -dependencies = [ - "anthropic", - "anyhow", - "client", - "collections", - "completion", - "ctor", - "editor", - "env_logger", - "futures 0.3.28", - "gpui", - "http 0.1.0", - "language", - "language_model", - "log", - "menu", - "ollama", - "open_ai", - "parking_lot", - "project", - "rand 0.8.5", - "serde", - "serde_json", - "settings", - "smol", - "strum", - "text", - "theme", - "tiktoken-rs", - "ui", - "unindent", - "util", -] - [[package]] name = "supermaven" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 68afdfe547..7feb4610cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,6 @@ members = [ "crates/story", "crates/storybook", "crates/sum_tree", - "crates/summaries", "crates/supermaven", "crates/supermaven_api", "crates/tab_switcher", @@ -257,7 +256,6 @@ sqlez_macros = { path = "crates/sqlez_macros" } story = { path = "crates/story" } storybook = { path = "crates/storybook" } sum_tree = { path = "crates/sum_tree" } -summaries = { path = "crates/summaries" } supermaven = { path = "crates/supermaven" } supermaven_api = { path = "crates/supermaven_api" } tab_switcher = { path = "crates/tab_switcher" } diff --git a/crates/semantic_index/Cargo.toml b/crates/semantic_index/Cargo.toml index ca669d6d8d..fda964f5c5 100644 --- a/crates/semantic_index/Cargo.toml +++ b/crates/semantic_index/Cargo.toml @@ -19,6 +19,7 @@ crate-type = ["bin"] [dependencies] anyhow.workspace = true +blake3.workspace = true client.workspace = true clock.workspace = true collections.workspace = true diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index f9915225ba..28fc74b16b 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -8,7 +8,7 @@ use collections::{Bound, HashMap, HashSet}; use completion::CompletionProvider; pub use embedding::*; use fs::Fs; -use futures::{future::Shared, stream::StreamExt, FutureExt}; +use futures::{future::Shared, stream::StreamExt, FutureExt, TryFutureExt}; use futures_batch::ChunksTimeoutStreamExt; use gpui::{ AppContext, AsyncAppContext, BorrowAppContext, Context, Entity, EntityId, EventEmitter, Global, @@ -46,7 +46,6 @@ const PREFERRED_SUMMARIZATION_MODEL: LanguageModel = pub struct SemanticIndex { embedding_provider: Arc, - summary_provider: Arc, db_connection: heed::Env, project_indices: HashMap, Model>, } @@ -57,7 +56,6 @@ impl SemanticIndex { pub async fn new( db_path: PathBuf, embedding_provider: Arc, - summary_provider: Arc, cx: &mut AsyncAppContext, ) -> Result { let db_connection = cx @@ -77,7 +75,6 @@ impl SemanticIndex { Ok(SemanticIndex { db_connection, embedding_provider, - summary_provider, project_indices: HashMap::default(), }) } @@ -107,7 +104,6 @@ impl SemanticIndex { project, self.db_connection.clone(), self.embedding_provider.clone(), - self.summary_provider.clone(), cx, ) }) @@ -125,7 +121,6 @@ pub struct ProjectIndex { last_status: Status, status_tx: channel::Sender<()>, embedding_provider: Arc, - summary_provider: Arc, _maintain_status: Task<()>, _subscription: Subscription, } @@ -145,7 +140,6 @@ impl ProjectIndex { project: Model, db_connection: heed::Env, embedding_provider: Arc, - summary_provider: Arc, cx: &mut ModelContext, ) -> Self { let language_registry = project.read(cx).languages().clone(); @@ -160,7 +154,6 @@ impl ProjectIndex { status_tx, last_status: Status::Idle, embedding_provider, - summary_provider, _subscription: cx.subscribe(&project, Self::handle_project_event), _maintain_status: cx.spawn(|this, mut cx| async move { while status_rx.next().await.is_some() { @@ -231,7 +224,6 @@ impl ProjectIndex { self.fs.clone(), self.status_tx.clone(), self.embedding_provider.clone(), - self.summary_provider.clone(), cx, ); @@ -507,7 +499,6 @@ struct WorktreeIndex { language_registry: Arc, fs: Arc, embedding_provider: Arc, - summary_provider: Arc, entry_ids_being_indexed: Arc, _index_entries: Task>, _subscription: Subscription, @@ -521,7 +512,6 @@ impl WorktreeIndex { fs: Arc, status_tx: channel::Sender<()>, embedding_provider: Arc, - summary_provider: Arc, cx: &mut AppContext, ) -> Task>> { let worktree_abs_path = worktree.read(cx).abs_path(); @@ -559,7 +549,6 @@ impl WorktreeIndex { language_registry, fs, embedding_provider, - summary_provider, cx, ) }) @@ -576,7 +565,6 @@ impl WorktreeIndex { language_registry: Arc, fs: Arc, embedding_provider: Arc, - summary_provider: Arc, cx: &mut ModelContext, ) -> Self { let (updated_entries_tx, updated_entries_rx) = channel::unbounded(); @@ -594,7 +582,6 @@ impl WorktreeIndex { language_registry, fs, embedding_provider, - summary_provider, entry_ids_being_indexed: Arc::new(IndexingEntrySet::new(status)), _index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)), _subscription, @@ -623,18 +610,19 @@ impl WorktreeIndex { let worktree = self.worktree.read(cx).snapshot(); let worktree_abs_path = worktree.abs_path().clone(); let scan = self.scan_entries(worktree, cx); - // TODO: summarize in here let processed = self.process_files(worktree_abs_path, scan.updated_entries, cx); let embed = Self::embed_files(self.embedding_provider.clone(), processed.chunked_files, cx); - let summaries = Self::summarize_files(processed.unsummarized_files, cx); + let might_need_summary = self.check_summary_cache(processed.might_need_summary, cx); + let summarized = self.summarize_files(might_need_summary.files, cx); + let persist_summaries = self.persist_summaries(summarized.files, cx); let persist_embeds = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); - let persist_summaries = self.persist_summaries(summaries.files, cx); async move { futures::try_join!( scan.task, processed.task, embed.task, - summaries.task, + might_need_summary.task, + summarized.task, persist_embeds, persist_summaries )?; @@ -652,15 +640,17 @@ impl WorktreeIndex { let scan = self.scan_updated_entries(worktree, updated_entries.clone(), cx); let processed = self.process_files(worktree_abs_path, scan.updated_entries, cx); let embed = Self::embed_files(self.embedding_provider.clone(), processed.chunked_files, cx); - let summaries = Self::summarize_files(processed.unsummarized_files, cx); + let might_need_summary = self.check_summary_cache(processed.might_need_summary, cx); + let summarized = self.summarize_files(might_need_summary.files, cx); + let persist_summaries = self.persist_summaries(summarized.files, cx); let persist_embeds = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); - let persist_summaries = self.persist_summaries(summaries.files, cx); async move { futures::try_join!( scan.task, processed.task, embed.task, - summaries.task, + might_need_summary.task, + summarized.task, persist_embeds, persist_summaries )?; @@ -835,7 +825,7 @@ impl WorktreeIndex { let language_registry = self.language_registry.clone(); let fs = self.fs.clone(); let (chunked_files_tx, chunked_files_rx) = channel::bounded(2048); - let (unsummarized_tx, unsummarized_rx) = channel::bounded(2048); + let (might_need_summary_tx, might_need_summary_rx) = channel::bounded(2048); let task = cx.spawn(|cx| async move { cx.background_executor() .scoped(|cx| { @@ -862,11 +852,38 @@ impl WorktreeIndex { handle, path: entry.path, mtime: entry.mtime, - text, + text: text.clone(), }; - if chunked_files_tx.send(chunked_file).await.is_err() { - return; + let content_hash = { + let mut hasher = blake3::Hasher::new(); + + hasher.update(text.as_bytes()); + + hasher.finalize().to_hex().to_string() + }; + + let unsummarized_file = UnsummarizedFile { + content_hash, + contents: text, + }; + + match futures::future::try_join( + might_need_summary_tx + .send(unsummarized_file) + .map_err(|error| anyhow!(error)), + chunked_files_tx + .send(chunked_file) + .map_err(|error| anyhow!(error)), + ) + .await + { + Ok(_) => {} + Err(err) => { + log::error!("Error: {:?}", err); + + return; + } } } }); @@ -878,7 +895,7 @@ impl WorktreeIndex { ProcessedFiles { chunked_files: chunked_files_rx, - unsummarized_files: unsummarized_rx, + might_need_summary: might_need_summary_rx, task, } } @@ -1002,7 +1019,7 @@ impl WorktreeIndex { fn persist_summaries( &self, - summaries: channel::Receiver<(SummarizedFile, IndexingEntryHandle)>, + summaries: channel::Receiver, cx: &AppContext, ) -> Task> { let db_connection = self.db_connection.clone(); @@ -1011,7 +1028,7 @@ impl WorktreeIndex { let mut summaries = summaries.chunks_timeout(4096, Duration::from_secs(2)); while let Some(summaries) = summaries.next().await { let mut txn = db_connection.write_txn()?; - for (file, _) in &summaries { + for file in &summaries { log::debug!("saving summary for content hash {:?}", file.content_hash); db.put(&mut txn, &file.content_hash, &file.summary)?; } @@ -1070,117 +1087,64 @@ impl WorktreeIndex { } fn summarize_files( - unsummarized_files: channel::Receiver, + &self, + mut unsummarized_files: channel::Receiver, cx: &AppContext, ) -> SummarizeFiles { - let completion_provider = CompletionProvider::global(cx); let (summarized_tx, summarized_rx) = channel::bounded(512); + let task = cx.spawn(|cx| async move { + while let Some(file) = unsummarized_files.next().await { + let summary = cx.update(|cx| Self::summarize_code(&file.contents, cx))?; - let task = cx.spawn(|cx| async { - let mut summaries = Vec::new(); - let mut summarize_tasks = Vec::new(); - let mut unsummarized_receiver = unsummarized_files.clone(); + summarized_tx + .send(SummarizedFile { + content_hash: file.content_hash, + summary: summary.await?, + }) + .await? + } - loop { - futures::select! { - next = unsummarized_receiver.next() => { - if let Some(file) = next { - let task = cx.update(|cx| Self::summarize_code(&file.contents, cx)); + Ok(()) + }); - summarize_tasks.push(task); - } - }, - result = futures::future::select_all(&mut summarize_tasks) => { - match result { - Ok(_) => {} - Err(_) => {} + SummarizeFiles { + files: summarized_rx, + task, + } + } + + fn check_summary_cache( + &self, + mut might_need_summary: channel::Receiver, + cx: &AppContext, + ) -> NeedsSummary { + let db_connection = self.db_connection.clone(); + let db = self.summary_db; + let (needs_summary_tx, needs_summary_rx) = channel::bounded(512); + let task = cx.background_executor().spawn(async move { + while let Some(file) = might_need_summary.next().await { + let tx = db_connection + .read_txn() + .context("Failed to create read transaction for checking which hashes are in summary cache")?; + + match db.get(&tx, &file.content_hash) { + Ok(opt_answer) => { + // It's not in the summary cache db, so we need to summarize it. + if opt_answer.is_none() { + needs_summary_tx.send(file).await?; } } - complete => break, + Err(err) => { + log::error!("Reading from the summaries database failed: {:?}", err); + } } } Ok(()) }); - // let summaries = futures::future::join_all(summarize_tasks); - - // let task = cx.background_executor().spawn(async move { - // let mut unsummarized_batches = - // unsummarized_files.chunks_timeout(512, Duration::from_secs(2)); - // while let Some(unsummarized) = unsummarized_batches.next().await { - // // View the batch of files as a vec of chunks - // // Flatten out to a vec of chunks that we can subdivide into batch sized pieces - // // Once those are done, reassemble them back into the files in which they belong - // // If any embeddings fail for a file, the entire file is discarded - - // let summaries = futures::future::join_all( - // unsummarized.iter().map(|file| { - // Self::summarize_code(&file.contents, cx) - // }) - // ); - - // let chunks: Vec = unsummarized - // .iter() - // .flat_map(|file| { - // file.chunks.iter().map(|chunk| TextToEmbed { - // text: &file.text[chunk.range.clone()], - // digest: chunk.digest, - // }) - // }) - // .collect::>(); - - // let mut embeddings: Vec> = Vec::new(); - // for embedding_batch in chunks.chunks(embedding_provider.batch_size()) { - // if let Some(batch_embeddings) = - // embedding_provider.embed(embedding_batch).await.log_err() - // { - // if batch_embeddings.len() == embedding_batch.len() { - // embeddings.extend(batch_embeddings.into_iter().map(Some)); - // continue; - // } - // log::error!( - // "embedding provider returned unexpected embedding count {}, expected {}", - // batch_embeddings.len(), embedding_batch.len() - // ); - // } - - // embeddings.extend(iter::repeat(None).take(embedding_batch.len())); - // } - - // let mut embeddings = embeddings.into_iter(); - // for chunked_file in unsummarized { - // let mut embedded_file = EmbeddedFile { - // path: chunked_file.path, - // mtime: chunked_file.mtime, - // chunks: Vec::new(), - // }; - - // let mut embedded_all_chunks = true; - // for (chunk, embedding) in - // chunked_file.chunks.into_iter().zip(embeddings.by_ref()) - // { - // if let Some(embedding) = embedding { - // embedded_file - // .chunks - // .push(EmbeddedChunk { chunk, embedding }); - // } else { - // embedded_all_chunks = false; - // } - // } - - // if embedded_all_chunks { - // summarized_tx - // .send((embedded_file, chunked_file.handle)) - // .await?; - // } - // } - // } - // Ok(()) - // }); - - SummarizeFiles { - files: summarized_rx, + NeedsSummary { + files: needs_summary_rx, task, } } @@ -1194,7 +1158,7 @@ struct ScanEntries { struct ProcessedFiles { chunked_files: channel::Receiver, - unsummarized_files: channel::Receiver, + might_need_summary: channel::Receiver, task: Task>, } @@ -1225,7 +1189,12 @@ struct EmbeddedChunk { } struct SummarizeFiles { - files: channel::Receiver<(SummarizedFile, IndexingEntryHandle)>, + files: channel::Receiver, + task: Task>, +} + +struct NeedsSummary { + files: channel::Receiver, task: Task>, } @@ -1518,6 +1487,3 @@ mod tests { ); } } - -// See https://github.com/zed-industries/zed/pull/14823#discussion_r1684616398 for why this is here and when it should be removed. -type _TODO = completion::CompletionProvider; diff --git a/crates/summaries/Cargo.toml b/crates/summaries/Cargo.toml deleted file mode 100644 index d922bcf8f8..0000000000 --- a/crates/summaries/Cargo.toml +++ /dev/null @@ -1,57 +0,0 @@ -[package] -name = "summaries" -version = "0.1.0" -edition = "2021" -publish = false -license = "GPL-3.0-or-later" - -[lints] -workspace = true - -[lib] -path = "src/summaries.rs" -doctest = false - -[features] -test-support = [ - "editor/test-support", - "language/test-support", - "project/test-support", - "text/test-support", -] - -[dependencies] -anthropic = { workspace = true, features = ["schemars"] } -anyhow.workspace = true -client.workspace = true -collections.workspace = true -completion.workspace = true -editor.workspace = true -futures.workspace = true -gpui.workspace = true -http.workspace = true -language_model.workspace = true -log.workspace = true -menu.workspace = true -ollama = { workspace = true, features = ["schemars"] } -open_ai = { workspace = true, features = ["schemars"] } -parking_lot.workspace = true -serde.workspace = true -serde_json.workspace = true -settings.workspace = true -smol.workspace = true -strum.workspace = true -theme.workspace = true -tiktoken-rs.workspace = true -ui.workspace = true -util.workspace = true - -[dev-dependencies] -ctor.workspace = true -editor = { workspace = true, features = ["test-support"] } -env_logger.workspace = true -language = { workspace = true, features = ["test-support"] } -project = { workspace = true, features = ["test-support"] } -rand.workspace = true -text = { workspace = true, features = ["test-support"] } -unindent.workspace = true diff --git a/crates/summaries/LICENSE-GPL b/crates/summaries/LICENSE-GPL deleted file mode 120000 index a8c09f35f1..0000000000 --- a/crates/summaries/LICENSE-GPL +++ /dev/null @@ -1 +0,0 @@ -/Users/rtfeldman/code/zed/LICENSE-GPL \ No newline at end of file diff --git a/crates/summaries/src/cache_summaries.rs b/crates/summaries/src/cache_summaries.rs deleted file mode 100644 index d99fb9d45a..0000000000 --- a/crates/summaries/src/cache_summaries.rs +++ /dev/null @@ -1,106 +0,0 @@ -use ::fs::Fs; -use anyhow::Result; -use futures::Future; -use paths; -use serde::{de::DeserializeOwned, Serialize}; -use std::path::PathBuf; - -/// Whenever we change the format of the file summaries on disk, we should increment this. -const FILE_SUMMARIES_VERSION: u32 = 1; - -/// Having all the summaries organized into a folder with their version number will make it -/// easier to clear oboslete caches, compared to having to read each individual file -/// to see what version it's using. -fn cache_dir() -> PathBuf { - paths::temp_dir() - .join("file_summaries") - .join(concat!("v{FILE_SUMMARIES_VERSION}")) -} - -fn cache_path_for_contents(content: impl AsRef<[u8]>) -> PathBuf { - let mut hasher = blake3::Hasher::new(); - - hasher.update(content.as_ref()); - - cache_dir().join(format!("{}.json", hasher.finalize().to_hex())) -} - -/// Given some source code, attempt to read its summary out of the cache. If there is no summary in the -/// cache, or it cannot be read for any reason, return None. -pub async fn get_cached_summary< - Summary: Serialize + DeserializeOwned, - SummarizeFuture: Future>, - Error, ->( - fs: &impl Fs, - source_code: impl AsRef<[u8]>, - summarize: impl FnOnce() -> SummarizeFuture, -) -> Result { - let get_cache_file_path = || cache_path_for_contents(source_code.as_ref()); - let cache_file_path = get_cache_file_path(); - let opt_summary = fs - .open_sync(&cache_file_path) - .await - .ok() - .and_then(|reader| serde_json::from_reader(reader).ok()); - - match opt_summary { - Some(cached_summary) => Ok(cached_summary), - None => { - // We couldn't open the cache file (either because it wasn't there or was corrupted or something; - // regardless, we don't have access to the cached data), so fall back on summarizing. - let summary = summarize().await?; - - let get_json = || { - serde_json::to_string(&summary) - .map_err(|serde_err| anyhow::Error::msg(serde_err.to_string())) - }; - - match get_json() { - Ok(json) => { - if let Err(_) = fs.atomic_write(cache_file_path, json).await { - // This might have errored for a harmless reason such as the cache dir having - // been deleted, so try to create that dir and retry. If it fails again, - // log an error but don't block the user. - if let Err(err) = retry_cache_write(fs, get_cache_file_path, get_json).await - { - log::error!( - "Error attempting to write summary to cache file {}: {:?}", - cache_path_for_contents(source_code).display(), - err - ) - } - } - } - Err(serde_err) => { - log::error!( - "Error trying to serialize source file contents as JSON. This is a programmer error and should never happen! {:?}", - serde_err - ) - } - } - - Ok(summary) - } - } -} - -async fn retry_cache_write( - fs: &impl Fs, - get_cache_file_path: impl FnOnce() -> PathBuf, - get_cache_file_contents: impl FnOnce() -> Result, -) -> Result<()> { - // Maybe the cache dir didn't exist; if not, create it. - // (It may have failed for a different reason, but the Fs trait doesn't - // currently include that info, so for now just always try this.) - // - // We don't want to do this defensively before every single file write - // to the cache, because it will be a waste of time essentially always - // except for the very first time we ever write to this cache dir. - fs.create_dir(&cache_dir()).await?; - - // Retry the file open, now that we've created the parent directory. - // If it still fails even after trying that, give up and return Err. - fs.atomic_write(get_cache_file_path(), get_cache_file_contents()?) - .await -} diff --git a/crates/summaries/src/index_summaries.rs b/crates/summaries/src/index_summaries.rs deleted file mode 100644 index e3c4455e97..0000000000 --- a/crates/summaries/src/index_summaries.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::completion_provider::CompletionProvider; -use crate::{cache_summaries::get_cached_summary, LanguageModelRequest}; -use crate::{LanguageModelRequestMessage, Role}; -use anyhow::{Context, Result}; -use fs::Fs; -use gpui::{AppContext, AsyncAppContext, Task}; -use smol::stream::StreamExt; -use std::{path::Path, sync::Arc}; - -const SUMMARY_PROMPT: &str = "Summarize this file:"; - -pub fn summarize_file(fs: &impl Fs, path: &Path, cx: &mut AsyncAppContext) -> Task> { - todo!() - // cx.background_executor().spawn(async move { - // let file_content = fs.load(path).await?; - - // get_cached_summary(fs, &file_content, || async { - // let provider = cx.update(|cx| CompletionProvider::global(cx))?; - // generate_summary(&file_content, provider).await - // }) - // .await - // }) -} - -async fn generate_summary(file_content: &str, provider: &CompletionProvider) -> Result { - let request = LanguageModelRequest { - model: provider.model(), - messages: vec![ - LanguageModelRequestMessage { - role: Role::System, - content: SUMMARY_PROMPT.to_string(), - }, - LanguageModelRequestMessage { - role: Role::User, - content: file_content.to_string(), - }, - ], - stop: vec![], - temperature: 0.7, - }; - let mut messages = provider.complete(request).await?; - let mut answer = String::new(); - - while let Some(chunk) = messages.next().await { - answer.push_str(&chunk?); - } - - Ok(answer) -} diff --git a/crates/summaries/src/summaries.rs b/crates/summaries/src/summaries.rs deleted file mode 100644 index 6ce82bcc6a..0000000000 --- a/crates/summaries/src/summaries.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod cache_summaries; -mod index_summaries; - -pub use cache_summaries::*; -pub use index_summaries::*;