From 22e8a67498363470eda3c802ee5bc5fa45bad2a8 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 28 Jul 2023 19:14:58 +0200 Subject: [PATCH] Load branch from kv store if necessary Co-Authored-By: Nathan Sobo Co-Authored-By: Max Brunsfeld --- crates/crdb/src/btree.rs | 4 +- crates/crdb/src/btree/map.rs | 21 ++++++++- crates/crdb/src/crdb.rs | 84 ++++++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 36 deletions(-) diff --git a/crates/crdb/src/btree.rs b/crates/crdb/src/btree.rs index 90024adf6f..51715eaed1 100644 --- a/crates/crdb/src/btree.rs +++ b/crates/crdb/src/btree.rs @@ -678,7 +678,7 @@ where T: Item + Serialize + for<'a> Deserialize<'a>, T::Summary: Serialize + for<'a> Deserialize<'a>, { - pub async fn from_root(root_id: SavedId, kv: &dyn KvStore) -> Result { + pub async fn load_root(root_id: SavedId, kv: &dyn KvStore) -> Result { let root = kv.load(namespace_bytes("node"), root_id.as_u128()).await?; let root = serde_bare::from_slice(&root)?; let node = match root { @@ -750,7 +750,7 @@ where start_summary: summary.clone(), }), ChildTree::Unloaded { saved_id } => { - let tree = Sequence::from_root(saved_id.clone(), kv).await?; + let tree = Sequence::load_root(saved_id.clone(), kv).await?; *child_tree = ChildTree::Loaded { tree }; if let ChildTree::Loaded { tree } = child_tree { stack.push(Frame { diff --git a/crates/crdb/src/btree/map.rs b/crates/crdb/src/btree/map.rs index 50e8e9afc5..f01c47e582 100644 --- a/crates/crdb/src/btree/map.rs +++ b/crates/crdb/src/btree/map.rs @@ -48,12 +48,29 @@ where Self(tree) } - pub async fn load(id: SavedId, kv: &dyn KvStore) -> Result + pub async fn load_root(id: SavedId, kv: &dyn KvStore) -> Result where K: Serialize + for<'de> Deserialize<'de>, V: Serialize + for<'de> Deserialize<'de>, { - Ok(Self(Sequence::from_root(id, kv).await?)) + Ok(Self(Sequence::load_root(id, kv).await?)) + } + + pub async fn load(&mut self, key: &K, kv: &dyn KvStore) -> Result> + where + K: Serialize + for<'de> Deserialize<'de>, + V: Serialize + for<'de> Deserialize<'de>, + { + self.0 + .load(kv, &(), |probe| { + let key_range = ( + Bound::Excluded(&probe.start.0), + Bound::Included(&probe.summary.0), + ); + key_range.contains(key) + }) + .await?; + Ok(self.get(key)) } pub async fn save(&self, kv: &dyn KvStore) -> Result diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index f5355d1f67..c84a9a2a7a 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -760,22 +760,36 @@ impl Repo { } } - fn branch(&self, name: &str) -> Result { - let branch_id = self - .read(|repo| { - Some( - *repo - .branches - .iter() - .find(|(_, branch)| branch.name.as_ref() == name)? - .0, - ) + fn load_branch(&self, name: &str) -> impl Future> { + let mut snapshot = self.read(|repo| repo.clone()); + let this = self.clone(); + let name = name.to_string(); + + async move { + let branch_id = *snapshot + .branch_ids_by_name + .load(&name, &*this.db.kv) + .await? + .ok_or_else(|| anyhow!("branch not found"))?; + + loop { + let latest_snapshot = this.read(|repo| repo.clone()); + let operations = latest_snapshot.operations_since(&snapshot.max_operation_ids); + if operations.is_empty() { + break; + } + snapshot.apply_operations(operations); + } + this.update(|latest_snapshot| { + *latest_snapshot = snapshot; + (None, ()) + }); + + Ok(Branch { + id: branch_id, + repo: this, }) - .ok_or_else(|| anyhow!("branch not found"))?; - Ok(Branch { - id: branch_id, - repo: self.clone(), - }) + } } fn branches(&self) -> Vec { @@ -859,7 +873,7 @@ impl Branch { }) } - pub fn document(&self, id: OperationId) -> Result { + pub fn load_document(&self, id: OperationId) -> Result { self.read(|revision| { revision .document_metadata @@ -896,7 +910,7 @@ impl Branch { .expect("branch must exist") .head .clone(); - let mut revision = repo.revision(&head).expect("revision must exist"); + let mut revision = repo.load_revision(&head).expect("revision must exist"); let operation_id = repo.last_operation_id.tick(); let (operation, result) = f(operation_id, head.clone(), &mut revision); repo.branches @@ -917,7 +931,7 @@ impl Branch { .expect("branch must exist") .head .clone(); - let revision = repo.revision(&head).expect("revision must exist"); + let revision = repo.load_revision(&head).expect("revision must exist"); f(&revision) }) } @@ -1575,6 +1589,8 @@ impl<'a> RopeBuilder<'a> { pub struct RepoSnapshot { last_operation_id: OperationId, branches: btree::Map, + // TODO: Change String to Arc for branch_ids_by_name + branch_ids_by_name: btree::Map, operations: btree::Map, revisions: btree::Map, max_operation_ids: btree::Map, @@ -1585,6 +1601,7 @@ pub struct RepoSnapshot { struct SavedRepoSnapshot { last_operation_id: OperationId, branches: btree::SavedId, + branch_ids_by_name: btree::SavedId, operations: btree::SavedId, max_operation_ids: btree::SavedId, deferred_operations: btree::SavedId, @@ -1595,6 +1612,7 @@ impl Default for RepoSnapshot { Self { last_operation_id: Default::default(), branches: Default::default(), + branch_ids_by_name: Default::default(), operations: Default::default(), revisions: btree::Map::from_ordered_entries([( RevisionId::default(), @@ -1621,11 +1639,12 @@ impl RepoSnapshot { let saved_repo = serde_bare::from_slice::(&repo_bytes)?; Ok(Self { last_operation_id: saved_repo.last_operation_id, - branches: btree::Map::load(saved_repo.branches, kv).await?, - operations: btree::Map::load(saved_repo.operations, kv).await?, + branches: btree::Map::load_root(saved_repo.branches, kv).await?, + branch_ids_by_name: btree::Map::load_root(saved_repo.branch_ids_by_name, kv).await?, + operations: btree::Map::load_root(saved_repo.operations, kv).await?, revisions: Default::default(), - max_operation_ids: btree::Map::load(saved_repo.max_operation_ids, kv).await?, - deferred_operations: btree::Sequence::from_root(saved_repo.deferred_operations, kv) + max_operation_ids: btree::Map::load_root(saved_repo.max_operation_ids, kv).await?, + deferred_operations: btree::Sequence::load_root(saved_repo.deferred_operations, kv) .await?, }) } @@ -1634,6 +1653,7 @@ impl RepoSnapshot { let saved_repo = SavedRepoSnapshot { last_operation_id: self.last_operation_id, branches: self.branches.save(kv).await?, + branch_ids_by_name: self.branch_ids_by_name.save(kv).await?, operations: self.operations.save(kv).await?, max_operation_ids: self.max_operation_ids.save(kv).await?, deferred_operations: self.deferred_operations.save(kv).await?, @@ -1749,7 +1769,7 @@ impl RepoSnapshot { // The following ensures that a revision for the branch head is always present. #[cfg(not(any(test, feature = "test-support")))] - if let Err(error) = self.revision(&new_head) { + if let Err(error) = self.load_revision(&new_head) { log::error!( "could not create revision for head {:?}: {:?}", new_head, @@ -1758,7 +1778,7 @@ impl RepoSnapshot { continue; } #[cfg(any(test, feature = "test-support"))] - self.revision(&new_head).unwrap(); + self.load_revision(&new_head).unwrap(); } else { for parent in operation.parent().iter() { self.deferred_operations.insert_or_replace( @@ -1807,7 +1827,7 @@ impl RepoSnapshot { self.operations.get(&operation_id) } - fn revision(&mut self, revision_id: &RevisionId) -> Result { + fn load_revision(&mut self, revision_id: &RevisionId) -> Result { // First, check if we have a revision cached for this revision id. // If not, we'll need to reconstruct it from a previous revision. // We need to find a cached revision that is an ancestor of the given revision id. @@ -1893,7 +1913,7 @@ impl RepoSnapshot { op.apply(&mut common_ancestor_revision); } Operation::Edit(op) => { - let parent_revision = self.revision(&op.parent)?; + let parent_revision = self.load_revision(&op.parent)?; op.apply(&parent_revision, &mut common_ancestor_revision)?; } Operation::CreateBranch(_) => { @@ -2164,10 +2184,10 @@ mod tests { ); let repo_b = client_b.clone_repo("repo-1").await.unwrap(); deterministic.run_until_parked(); - let branch_b = repo_b.branch("main").unwrap(); + let branch_b = repo_b.load_branch("main").await.unwrap(); - let doc1_b = branch_b.document(doc1_a.id).unwrap(); - let doc2_b = branch_b.document(doc2_a.id).unwrap(); + let doc1_b = branch_b.load_document(doc1_a.id).unwrap(); + let doc2_b = branch_b.load_document(doc2_a.id).unwrap(); assert_eq!(doc1_b.text().to_string(), "abc"); assert_eq!(doc2_b.text().to_string(), "def"); @@ -2555,7 +2575,7 @@ mod tests { .find(|c| c.id == *client_id) .ok_or_else(|| anyhow!("client not found"))?; let repo = client.repo(*repo_id).await?; - let branch = repo.branch(branch_name)?; + let branch = repo.load_branch(branch_name).await?; branch.create_document(); } @@ -2571,8 +2591,8 @@ mod tests { .find(|c| c.id == *client_id) .ok_or_else(|| anyhow!("client not found"))?; let repo = client.repo(*repo_id).await?; - let branch = repo.branch(branch_name)?; - let document = branch.document(*document_id)?; + let branch = repo.load_branch(branch_name).await?; + let document = branch.load_document(*document_id)?; document.edit(edits.iter().cloned()); } }