Load branch from kv store if necessary

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>
This commit is contained in:
Antonio Scandurra
2023-07-28 19:14:58 +02:00
parent 4848ea8e62
commit 22e8a67498
3 changed files with 73 additions and 36 deletions

View File

@@ -678,7 +678,7 @@ where
T: Item + Serialize + for<'a> Deserialize<'a>,
T::Summary: Serialize + for<'a> Deserialize<'a>,
{
pub async fn from_root(root_id: SavedId, kv: &dyn KvStore) -> Result<Self> {
pub async fn load_root(root_id: SavedId, kv: &dyn KvStore) -> Result<Self> {
let root = kv.load(namespace_bytes("node"), root_id.as_u128()).await?;
let root = serde_bare::from_slice(&root)?;
let node = match root {
@@ -750,7 +750,7 @@ where
start_summary: summary.clone(),
}),
ChildTree::Unloaded { saved_id } => {
let tree = Sequence::from_root(saved_id.clone(), kv).await?;
let tree = Sequence::load_root(saved_id.clone(), kv).await?;
*child_tree = ChildTree::Loaded { tree };
if let ChildTree::Loaded { tree } = child_tree {
stack.push(Frame {

View File

@@ -48,12 +48,29 @@ where
Self(tree)
}
pub async fn load(id: SavedId, kv: &dyn KvStore) -> Result<Self>
pub async fn load_root(id: SavedId, kv: &dyn KvStore) -> Result<Self>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
Ok(Self(Sequence::from_root(id, kv).await?))
Ok(Self(Sequence::load_root(id, kv).await?))
}
pub async fn load(&mut self, key: &K, kv: &dyn KvStore) -> Result<Option<&V>>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(&probe.start.0),
Bound::Included(&probe.summary.0),
);
key_range.contains(key)
})
.await?;
Ok(self.get(key))
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedId>

View File

@@ -760,22 +760,36 @@ impl Repo {
}
}
fn branch(&self, name: &str) -> Result<Branch> {
let branch_id = self
.read(|repo| {
Some(
*repo
.branches
.iter()
.find(|(_, branch)| branch.name.as_ref() == name)?
.0,
)
fn load_branch(&self, name: &str) -> impl Future<Output = Result<Branch>> {
let mut snapshot = self.read(|repo| repo.clone());
let this = self.clone();
let name = name.to_string();
async move {
let branch_id = *snapshot
.branch_ids_by_name
.load(&name, &*this.db.kv)
.await?
.ok_or_else(|| anyhow!("branch not found"))?;
loop {
let latest_snapshot = this.read(|repo| repo.clone());
let operations = latest_snapshot.operations_since(&snapshot.max_operation_ids);
if operations.is_empty() {
break;
}
snapshot.apply_operations(operations);
}
this.update(|latest_snapshot| {
*latest_snapshot = snapshot;
(None, ())
});
Ok(Branch {
id: branch_id,
repo: this,
})
.ok_or_else(|| anyhow!("branch not found"))?;
Ok(Branch {
id: branch_id,
repo: self.clone(),
})
}
}
fn branches(&self) -> Vec<Branch> {
@@ -859,7 +873,7 @@ impl Branch {
})
}
pub fn document(&self, id: OperationId) -> Result<Document> {
pub fn load_document(&self, id: OperationId) -> Result<Document> {
self.read(|revision| {
revision
.document_metadata
@@ -896,7 +910,7 @@ impl Branch {
.expect("branch must exist")
.head
.clone();
let mut revision = repo.revision(&head).expect("revision must exist");
let mut revision = repo.load_revision(&head).expect("revision must exist");
let operation_id = repo.last_operation_id.tick();
let (operation, result) = f(operation_id, head.clone(), &mut revision);
repo.branches
@@ -917,7 +931,7 @@ impl Branch {
.expect("branch must exist")
.head
.clone();
let revision = repo.revision(&head).expect("revision must exist");
let revision = repo.load_revision(&head).expect("revision must exist");
f(&revision)
})
}
@@ -1575,6 +1589,8 @@ impl<'a> RopeBuilder<'a> {
pub struct RepoSnapshot {
last_operation_id: OperationId,
branches: btree::Map<OperationId, BranchSnapshot>,
// TODO: Change String to Arc<str> for branch_ids_by_name
branch_ids_by_name: btree::Map<String, OperationId>,
operations: btree::Map<OperationId, Operation>,
revisions: btree::Map<RevisionId, Revision>,
max_operation_ids: btree::Map<ReplicaId, OperationCount>,
@@ -1585,6 +1601,7 @@ pub struct RepoSnapshot {
struct SavedRepoSnapshot {
last_operation_id: OperationId,
branches: btree::SavedId,
branch_ids_by_name: btree::SavedId,
operations: btree::SavedId,
max_operation_ids: btree::SavedId,
deferred_operations: btree::SavedId,
@@ -1595,6 +1612,7 @@ impl Default for RepoSnapshot {
Self {
last_operation_id: Default::default(),
branches: Default::default(),
branch_ids_by_name: Default::default(),
operations: Default::default(),
revisions: btree::Map::from_ordered_entries([(
RevisionId::default(),
@@ -1621,11 +1639,12 @@ impl RepoSnapshot {
let saved_repo = serde_bare::from_slice::<SavedRepoSnapshot>(&repo_bytes)?;
Ok(Self {
last_operation_id: saved_repo.last_operation_id,
branches: btree::Map::load(saved_repo.branches, kv).await?,
operations: btree::Map::load(saved_repo.operations, kv).await?,
branches: btree::Map::load_root(saved_repo.branches, kv).await?,
branch_ids_by_name: btree::Map::load_root(saved_repo.branch_ids_by_name, kv).await?,
operations: btree::Map::load_root(saved_repo.operations, kv).await?,
revisions: Default::default(),
max_operation_ids: btree::Map::load(saved_repo.max_operation_ids, kv).await?,
deferred_operations: btree::Sequence::from_root(saved_repo.deferred_operations, kv)
max_operation_ids: btree::Map::load_root(saved_repo.max_operation_ids, kv).await?,
deferred_operations: btree::Sequence::load_root(saved_repo.deferred_operations, kv)
.await?,
})
}
@@ -1634,6 +1653,7 @@ impl RepoSnapshot {
let saved_repo = SavedRepoSnapshot {
last_operation_id: self.last_operation_id,
branches: self.branches.save(kv).await?,
branch_ids_by_name: self.branch_ids_by_name.save(kv).await?,
operations: self.operations.save(kv).await?,
max_operation_ids: self.max_operation_ids.save(kv).await?,
deferred_operations: self.deferred_operations.save(kv).await?,
@@ -1749,7 +1769,7 @@ impl RepoSnapshot {
// The following ensures that a revision for the branch head is always present.
#[cfg(not(any(test, feature = "test-support")))]
if let Err(error) = self.revision(&new_head) {
if let Err(error) = self.load_revision(&new_head) {
log::error!(
"could not create revision for head {:?}: {:?}",
new_head,
@@ -1758,7 +1778,7 @@ impl RepoSnapshot {
continue;
}
#[cfg(any(test, feature = "test-support"))]
self.revision(&new_head).unwrap();
self.load_revision(&new_head).unwrap();
} else {
for parent in operation.parent().iter() {
self.deferred_operations.insert_or_replace(
@@ -1807,7 +1827,7 @@ impl RepoSnapshot {
self.operations.get(&operation_id)
}
fn revision(&mut self, revision_id: &RevisionId) -> Result<Revision> {
fn load_revision(&mut self, revision_id: &RevisionId) -> Result<Revision> {
// First, check if we have a revision cached for this revision id.
// If not, we'll need to reconstruct it from a previous revision.
// We need to find a cached revision that is an ancestor of the given revision id.
@@ -1893,7 +1913,7 @@ impl RepoSnapshot {
op.apply(&mut common_ancestor_revision);
}
Operation::Edit(op) => {
let parent_revision = self.revision(&op.parent)?;
let parent_revision = self.load_revision(&op.parent)?;
op.apply(&parent_revision, &mut common_ancestor_revision)?;
}
Operation::CreateBranch(_) => {
@@ -2164,10 +2184,10 @@ mod tests {
);
let repo_b = client_b.clone_repo("repo-1").await.unwrap();
deterministic.run_until_parked();
let branch_b = repo_b.branch("main").unwrap();
let branch_b = repo_b.load_branch("main").await.unwrap();
let doc1_b = branch_b.document(doc1_a.id).unwrap();
let doc2_b = branch_b.document(doc2_a.id).unwrap();
let doc1_b = branch_b.load_document(doc1_a.id).unwrap();
let doc2_b = branch_b.load_document(doc2_a.id).unwrap();
assert_eq!(doc1_b.text().to_string(), "abc");
assert_eq!(doc2_b.text().to_string(), "def");
@@ -2555,7 +2575,7 @@ mod tests {
.find(|c| c.id == *client_id)
.ok_or_else(|| anyhow!("client not found"))?;
let repo = client.repo(*repo_id).await?;
let branch = repo.branch(branch_name)?;
let branch = repo.load_branch(branch_name).await?;
branch.create_document();
}
@@ -2571,8 +2591,8 @@ mod tests {
.find(|c| c.id == *client_id)
.ok_or_else(|| anyhow!("client not found"))?;
let repo = client.repo(*repo_id).await?;
let branch = repo.branch(branch_name)?;
let document = branch.document(*document_id)?;
let branch = repo.load_branch(branch_name).await?;
let document = branch.load_document(*document_id)?;
document.edit(edits.iter().cloned());
}
}