Index files as they change on disk

This commit is contained in:
Antonio Scandurra
2024-04-08 11:06:11 +02:00
committed by Kyle Kelley
parent 3007fb51c3
commit 4ff38fc823
2 changed files with 139 additions and 56 deletions

View File

@@ -4,8 +4,9 @@ use futures::task::waker;
use gpui::{App, Global, TestAppContext};
use language::language_settings::AllLanguageSettings;
use project::Project;
use semantic_index::embedding::{EmbeddingModel, FakeEmbeddingProvider, OpenaiEmbeddingProvider};
use semantic_index::SemanticIndex;
use semantic_index::{
EmbeddingModel, FakeEmbeddingProvider, OpenaiEmbeddingProvider, SemanticIndex,
};
use settings::SettingsStore;
use std::fs;
use std::io::{self, Read, Seek, SeekFrom};

View File

@@ -1,10 +1,10 @@
mod chunking;
pub mod embedding;
mod embedding;
use anyhow::{anyhow, Context as _, Result};
use chunking::{chunk_text, Chunk};
use collections::{Bound, HashMap};
use embedding::{Embedding, EmbeddingProvider};
pub use embedding::*;
use fs::Fs;
use futures::stream::StreamExt;
use futures_batch::ChunksTimeoutStreamExt;
@@ -14,11 +14,12 @@ use gpui::{
};
use heed::types::{SerdeBincode, Str};
use language::LanguageRegistry;
use project::{Entry, Project, Worktree};
use project::{Entry, Project, UpdatedEntriesSet, Worktree};
use serde::{Deserialize, Serialize};
use smol::channel;
use std::{
cmp::Ordering,
future::Future,
ops::Range,
path::Path,
sync::Arc,
@@ -48,7 +49,7 @@ impl SemanticIndex {
.spawn(async move {
unsafe {
heed::EnvOpenOptions::new()
.map_size(128 * 1024 * 1024) // todo!("revisit this value, we shouldn't be writing too much at once")
.map_size(1024 * 1024 * 1024)
.max_dbs(3000)
.open(db_path)
}
@@ -244,9 +245,10 @@ struct WorktreeIndex {
db: heed::Database<Str, SerdeBincode<EmbeddedFile>>,
language_registry: Arc<LanguageRegistry>,
fs: Arc<dyn Fs>,
status: Status,
index_entries: Task<Result<()>>,
embedding_provider: Arc<dyn EmbeddingProvider>,
status: Status,
_index_entries: Task<Result<()>>,
_subscription: Subscription,
}
impl WorktreeIndex {
@@ -296,54 +298,93 @@ impl WorktreeIndex {
embedding_provider: Arc<dyn EmbeddingProvider>,
cx: &mut ModelContext<Self>,
) -> Self {
let (updated_entries_tx, updated_entries_rx) = channel::unbounded();
let _subscription = cx.subscribe(&worktree, move |_this, _worktree, event, _cx| {
if let worktree::Event::UpdatedEntries(update) = event {
_ = updated_entries_tx.try_send(update.clone());
}
});
Self {
db_connection,
db,
worktree,
language_registry,
fs,
status: Status::Idle,
index_entries: cx.spawn(Self::index_entries),
embedding_provider,
status: Status::Idle,
_index_entries: cx.spawn(|this, cx| Self::index_entries(this, updated_entries_rx, cx)),
_subscription,
}
}
async fn index_entries(this: WeakModel<Self>, mut cx: AsyncAppContext) -> Result<()> {
this.update(&mut cx, |this, cx| {
this.status = Status::Scanning;
async fn index_entries(
this: WeakModel<Self>,
updated_entries: channel::Receiver<UpdatedEntriesSet>,
mut cx: AsyncAppContext,
) -> Result<()> {
let index = this.update(&mut cx, |this, cx| {
cx.notify();
this.status = Status::Scanning;
this.index_entries_changed_on_disk(cx)
})?;
let worktree = this.read_with(&cx, |this, cx| {
this.worktree.read(cx).as_local().unwrap().snapshot()
})?;
let worktree_abs_path = worktree.abs_path().clone();
let scan = Self::scan_entries(&this, worktree.clone(), &mut cx)?;
let chunk = Self::chunk_files(&this, worktree_abs_path, scan.updated_entries, &mut cx)?;
let embed = Self::embed_files(&this, chunk.files, &mut cx)?;
let persist =
Self::persist_embeddings(&this, scan.deleted_entry_ranges, embed.files, &mut cx)?;
futures::try_join!(scan.task, chunk.task, embed.task, persist).log_err();
index.await.log_err();
this.update(&mut cx, |this, cx| {
this.status = Status::Idle;
cx.notify();
})?;
// todo!(React to files changing and re-run the pipeline above only for the changed entries)
while let Ok(updated_entries) = updated_entries.recv().await {
let index = this.update(&mut cx, |this, cx| {
cx.notify();
this.status = Status::Scanning;
this.index_updated_entries(updated_entries, cx)
})?;
index.await.log_err();
this.update(&mut cx, |this, cx| {
this.status = Status::Idle;
cx.notify();
})?;
}
Ok(())
}
fn scan_entries(
this: &WeakModel<Self>,
worktree: LocalSnapshot,
cx: &mut AsyncAppContext,
) -> Result<ScanEntries> {
fn index_entries_changed_on_disk(&self, cx: &AppContext) -> impl Future<Output = Result<()>> {
let worktree = self.worktree.read(cx).as_local().unwrap().snapshot();
let worktree_abs_path = worktree.abs_path().clone();
let scan = self.scan_entries(worktree.clone(), cx);
let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx);
let embed = self.embed_files(chunk.files, cx);
let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
async move {
futures::try_join!(scan.task, chunk.task, embed.task, persist)?;
Ok(())
}
}
fn index_updated_entries(
&self,
updated_entries: UpdatedEntriesSet,
cx: &AppContext,
) -> impl Future<Output = Result<()>> {
let worktree = self.worktree.read(cx).as_local().unwrap().snapshot();
let worktree_abs_path = worktree.abs_path().clone();
let scan = self.scan_updated_entries(worktree, updated_entries, cx);
let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx);
let embed = self.embed_files(chunk.files, cx);
let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx);
async move {
futures::try_join!(scan.task, chunk.task, embed.task, persist)?;
Ok(())
}
}
fn scan_entries(&self, worktree: LocalSnapshot, cx: &AppContext) -> ScanEntries {
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
let (db_connection, db) =
this.read_with(cx, |this, _| (this.db_connection.clone(), this.db.clone()))?;
let db_connection = self.db_connection.clone();
let db = self.db.clone();
let task = cx.background_executor().spawn(async move {
let txn = db_connection
.read_txn()
@@ -408,21 +449,61 @@ impl WorktreeIndex {
Ok(())
});
Ok(ScanEntries {
ScanEntries {
updated_entries: updated_entries_rx,
deleted_entry_ranges: deleted_entry_ranges_rx,
task,
})
}
}
fn scan_updated_entries(
&self,
worktree: LocalSnapshot,
updated_entries: UpdatedEntriesSet,
cx: &AppContext,
) -> ScanEntries {
let (updated_entries_tx, updated_entries_rx) = channel::bounded(512);
let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128);
let task = cx.background_executor().spawn(async move {
for (path, entry_id, status) in updated_entries.iter() {
match status {
project::PathChange::Added
| project::PathChange::Updated
| project::PathChange::AddedOrUpdated => {
if let Some(entry) = worktree.entry_for_id(*entry_id) {
updated_entries_tx.send(entry.clone()).await?;
}
}
project::PathChange::Removed => {
let db_path = db_key_for_path(path);
deleted_entry_ranges_tx
.send((Bound::Included(db_path.clone()), Bound::Included(db_path)))
.await?;
}
project::PathChange::Loaded => {
// Do nothing.
}
}
}
Ok(())
});
ScanEntries {
updated_entries: updated_entries_rx,
deleted_entry_ranges: deleted_entry_ranges_rx,
task,
}
}
fn chunk_files(
this: &WeakModel<Self>,
&self,
worktree_abs_path: Arc<Path>,
entries: channel::Receiver<Entry>,
cx: &mut AsyncAppContext,
) -> Result<ChunkFiles> {
let language_registry = this.read_with(cx, |this, _| this.language_registry.clone())?;
let fs = this.read_with(cx, |this, _| this.fs.clone())?;
cx: &AppContext,
) -> ChunkFiles {
let language_registry = self.language_registry.clone();
let fs = self.fs.clone();
let (chunked_files_tx, chunked_files_rx) = channel::bounded(2048);
let task = cx.spawn(|cx| async move {
cx.background_executor()
@@ -457,20 +538,20 @@ impl WorktreeIndex {
.await;
Ok(())
});
Ok(ChunkFiles {
ChunkFiles {
files: chunked_files_rx,
task,
})
}
}
fn embed_files(
this: &WeakModel<Self>,
&self,
chunked_files: channel::Receiver<ChunkedFile>,
cx: &mut AsyncAppContext,
) -> Result<EmbedFiles> {
let embedding_provider = this.read_with(cx, |this, _| this.embedding_provider.clone())?;
cx: &AppContext,
) -> EmbedFiles {
let embedding_provider = self.embedding_provider.clone();
let (embedded_files_tx, embedded_files_rx) = channel::bounded(512);
let task = cx.background_executor().spawn(async move {
let mut chunked_file_batches =
chunked_files.chunks_timeout(512, Duration::from_secs(2));
@@ -517,21 +598,22 @@ impl WorktreeIndex {
}
Ok(())
});
Ok(EmbedFiles {
EmbedFiles {
files: embedded_files_rx,
task,
})
}
}
fn persist_embeddings(
this: &WeakModel<Self>,
&self,
mut deleted_entry_ranges: channel::Receiver<(Bound<String>, Bound<String>)>,
embedded_files: channel::Receiver<EmbeddedFile>,
cx: &mut AsyncAppContext,
) -> Result<Task<Result<()>>> {
let (db_connection, db) =
this.read_with(cx, |this, _| (this.db_connection.clone(), this.db.clone()))?;
Ok(cx.background_executor().spawn(async move {
cx: &AppContext,
) -> Task<Result<()>> {
let db_connection = self.db_connection.clone();
let db = self.db.clone();
cx.background_executor().spawn(async move {
while let Some(deletion_range) = deleted_entry_ranges.next().await {
let mut txn = db_connection.write_txn()?;
let start = deletion_range.0.as_ref().map(|start| start.as_str());
@@ -554,7 +636,7 @@ impl WorktreeIndex {
}
Ok(())
}))
})
}
fn search(