From 4ff38fc823aef3fcef00dbece4c3d1763ecbdc72 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 8 Apr 2024 11:06:11 +0200 Subject: [PATCH] Index files as they change on disk --- crates/semantic_index/examples/index.rs | 5 +- crates/semantic_index/src/semantic_index.rs | 190 ++++++++++++++------ 2 files changed, 139 insertions(+), 56 deletions(-) diff --git a/crates/semantic_index/examples/index.rs b/crates/semantic_index/examples/index.rs index 96c329e364..45bf71d516 100644 --- a/crates/semantic_index/examples/index.rs +++ b/crates/semantic_index/examples/index.rs @@ -4,8 +4,9 @@ use futures::task::waker; use gpui::{App, Global, TestAppContext}; use language::language_settings::AllLanguageSettings; use project::Project; -use semantic_index::embedding::{EmbeddingModel, FakeEmbeddingProvider, OpenaiEmbeddingProvider}; -use semantic_index::SemanticIndex; +use semantic_index::{ + EmbeddingModel, FakeEmbeddingProvider, OpenaiEmbeddingProvider, SemanticIndex, +}; use settings::SettingsStore; use std::fs; use std::io::{self, Read, Seek, SeekFrom}; diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index ddaf6a74c2..0f9c0c1659 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -1,10 +1,10 @@ mod chunking; -pub mod embedding; +mod embedding; use anyhow::{anyhow, Context as _, Result}; use chunking::{chunk_text, Chunk}; use collections::{Bound, HashMap}; -use embedding::{Embedding, EmbeddingProvider}; +pub use embedding::*; use fs::Fs; use futures::stream::StreamExt; use futures_batch::ChunksTimeoutStreamExt; @@ -14,11 +14,12 @@ use gpui::{ }; use heed::types::{SerdeBincode, Str}; use language::LanguageRegistry; -use project::{Entry, Project, Worktree}; +use project::{Entry, Project, UpdatedEntriesSet, Worktree}; use serde::{Deserialize, Serialize}; use smol::channel; use std::{ cmp::Ordering, + future::Future, ops::Range, path::Path, sync::Arc, @@ -48,7 +49,7 @@ impl SemanticIndex { .spawn(async move { unsafe { heed::EnvOpenOptions::new() - .map_size(128 * 1024 * 1024) // todo!("revisit this value, we shouldn't be writing too much at once") + .map_size(1024 * 1024 * 1024) .max_dbs(3000) .open(db_path) } @@ -244,9 +245,10 @@ struct WorktreeIndex { db: heed::Database>, language_registry: Arc, fs: Arc, - status: Status, - index_entries: Task>, embedding_provider: Arc, + status: Status, + _index_entries: Task>, + _subscription: Subscription, } impl WorktreeIndex { @@ -296,54 +298,93 @@ impl WorktreeIndex { embedding_provider: Arc, cx: &mut ModelContext, ) -> Self { + let (updated_entries_tx, updated_entries_rx) = channel::unbounded(); + let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| { + if let worktree::Event::UpdatedEntries(update) = event { + _ = updated_entries_tx.try_send(update.clone()); + } + }); + Self { db_connection, db, worktree, language_registry, fs, - status: Status::Idle, - index_entries: cx.spawn(Self::index_entries), embedding_provider, + status: Status::Idle, + _index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)), + _subscription, } } - async fn index_entries(this: WeakModel, mut cx: AsyncAppContext) -> Result<()> { - this.update(&mut cx, |this, cx| { - this.status = Status::Scanning; + async fn index_entries( + this: WeakModel, + updated_entries: channel::Receiver, + mut cx: AsyncAppContext, + ) -> Result<()> { + let index = this.update(&mut cx, |this, cx| { cx.notify(); + this.status = Status::Scanning; + this.index_entries_changed_on_disk(cx) })?; - - let worktree = this.read_with(&cx, |this, cx| { - this.worktree.read(cx).as_local().unwrap().snapshot() - })?; - let worktree_abs_path = worktree.abs_path().clone(); - let scan = Self::scan_entries(&this, worktree.clone(), &mut cx)?; - let chunk = Self::chunk_files(&this, worktree_abs_path, scan.updated_entries, &mut cx)?; - let embed = Self::embed_files(&this, chunk.files, &mut cx)?; - let persist = - Self::persist_embeddings(&this, scan.deleted_entry_ranges, embed.files, &mut cx)?; - - futures::try_join!(scan.task, chunk.task, embed.task, persist).log_err(); + index.await.log_err(); this.update(&mut cx, |this, cx| { this.status = Status::Idle; cx.notify(); })?; - // todo!(React to files changing and re-run the pipeline above only for the changed entries) + while let Ok(updated_entries) = updated_entries.recv().await { + let index = this.update(&mut cx, |this, cx| { + cx.notify(); + this.status = Status::Scanning; + this.index_updated_entries(updated_entries, cx) + })?; + index.await.log_err(); + this.update(&mut cx, |this, cx| { + this.status = Status::Idle; + cx.notify(); + })?; + } Ok(()) } - fn scan_entries( - this: &WeakModel, - worktree: LocalSnapshot, - cx: &mut AsyncAppContext, - ) -> Result { + fn index_entries_changed_on_disk(&self, cx: &AppContext) -> impl Future> { + let worktree = self.worktree.read(cx).as_local().unwrap().snapshot(); + let worktree_abs_path = worktree.abs_path().clone(); + let scan = self.scan_entries(worktree.clone(), cx); + let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx); + let embed = self.embed_files(chunk.files, cx); + let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); + async move { + futures::try_join!(scan.task, chunk.task, embed.task, persist)?; + Ok(()) + } + } + + fn index_updated_entries( + &self, + updated_entries: UpdatedEntriesSet, + cx: &AppContext, + ) -> impl Future> { + let worktree = self.worktree.read(cx).as_local().unwrap().snapshot(); + let worktree_abs_path = worktree.abs_path().clone(); + let scan = self.scan_updated_entries(worktree, updated_entries, cx); + let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx); + let embed = self.embed_files(chunk.files, cx); + let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); + async move { + futures::try_join!(scan.task, chunk.task, embed.task, persist)?; + Ok(()) + } + } + + fn scan_entries(&self, worktree: LocalSnapshot, cx: &AppContext) -> ScanEntries { let (updated_entries_tx, updated_entries_rx) = channel::bounded(512); let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); - let (db_connection, db) = - this.read_with(cx, |this, _| (this.db_connection.clone(), this.db.clone()))?; + let db_connection = self.db_connection.clone(); + let db = self.db.clone(); let task = cx.background_executor().spawn(async move { let txn = db_connection .read_txn() @@ -408,21 +449,61 @@ impl WorktreeIndex { Ok(()) }); - Ok(ScanEntries { + ScanEntries { updated_entries: updated_entries_rx, deleted_entry_ranges: deleted_entry_ranges_rx, task, - }) + } + } + + fn scan_updated_entries( + &self, + worktree: LocalSnapshot, + updated_entries: UpdatedEntriesSet, + cx: &AppContext, + ) -> ScanEntries { + let (updated_entries_tx, updated_entries_rx) = channel::bounded(512); + let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); + let task = cx.background_executor().spawn(async move { + for (path, entry_id, status) in updated_entries.iter() { + match status { + project::PathChange::Added + | project::PathChange::Updated + | project::PathChange::AddedOrUpdated => { + if let Some(entry) = worktree.entry_for_id(*entry_id) { + updated_entries_tx.send(entry.clone()).await?; + } + } + project::PathChange::Removed => { + let db_path = db_key_for_path(path); + deleted_entry_ranges_tx + .send((Bound::Included(db_path.clone()), Bound::Included(db_path))) + .await?; + } + project::PathChange::Loaded => { + // Do nothing. + } + } + } + + Ok(()) + }); + + ScanEntries { + updated_entries: updated_entries_rx, + deleted_entry_ranges: deleted_entry_ranges_rx, + task, + } } fn chunk_files( - this: &WeakModel, + &self, worktree_abs_path: Arc, entries: channel::Receiver, - cx: &mut AsyncAppContext, - ) -> Result { - let language_registry = this.read_with(cx, |this, _| this.language_registry.clone())?; - let fs = this.read_with(cx, |this, _| this.fs.clone())?; + cx: &AppContext, + ) -> ChunkFiles { + let language_registry = self.language_registry.clone(); + let fs = self.fs.clone(); let (chunked_files_tx, chunked_files_rx) = channel::bounded(2048); let task = cx.spawn(|cx| async move { cx.background_executor() @@ -457,20 +538,20 @@ impl WorktreeIndex { .await; Ok(()) }); - Ok(ChunkFiles { + + ChunkFiles { files: chunked_files_rx, task, - }) + } } fn embed_files( - this: &WeakModel, + &self, chunked_files: channel::Receiver, - cx: &mut AsyncAppContext, - ) -> Result { - let embedding_provider = this.read_with(cx, |this, _| this.embedding_provider.clone())?; + cx: &AppContext, + ) -> EmbedFiles { + let embedding_provider = self.embedding_provider.clone(); let (embedded_files_tx, embedded_files_rx) = channel::bounded(512); - let task = cx.background_executor().spawn(async move { let mut chunked_file_batches = chunked_files.chunks_timeout(512, Duration::from_secs(2)); @@ -517,21 +598,22 @@ impl WorktreeIndex { } Ok(()) }); - Ok(EmbedFiles { + + EmbedFiles { files: embedded_files_rx, task, - }) + } } fn persist_embeddings( - this: &WeakModel, + &self, mut deleted_entry_ranges: channel::Receiver<(Bound, Bound)>, embedded_files: channel::Receiver, - cx: &mut AsyncAppContext, - ) -> Result>> { - let (db_connection, db) = - this.read_with(cx, |this, _| (this.db_connection.clone(), this.db.clone()))?; - Ok(cx.background_executor().spawn(async move { + cx: &AppContext, + ) -> Task> { + let db_connection = self.db_connection.clone(); + let db = self.db.clone(); + cx.background_executor().spawn(async move { while let Some(deletion_range) = deleted_entry_ranges.next().await { let mut txn = db_connection.write_txn()?; let start = deletion_range.0.as_ref().map(|start| start.as_str()); @@ -554,7 +636,7 @@ impl WorktreeIndex { } Ok(()) - })) + }) } fn search(