From 4848ea8e62de4aab05006cd5ff42ce72f104eafa Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 28 Jul 2023 13:08:33 +0200 Subject: [PATCH] Spawn a background task to save repo snapshots --- crates/crdb/src/btree.rs | 12 +++----- crates/crdb/src/btree/map.rs | 8 +++++ crates/crdb/src/crdb.rs | 57 +++++++++++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/crates/crdb/src/btree.rs b/crates/crdb/src/btree.rs index cc980ba6a3..90024adf6f 100644 --- a/crates/crdb/src/btree.rs +++ b/crates/crdb/src/btree.rs @@ -711,15 +711,14 @@ where Ok(Self(Arc::new(node))) } - pub async fn load( + pub async fn load( &mut self, - kv: &K, + kv: &dyn KvStore, cx: &::Context, mut f: F, ) -> Result<()> where F: FnMut(Probe) -> bool, - K: KvStore, { struct Frame<'a, T: Item> { tree: &'a mut Sequence, @@ -773,10 +772,7 @@ where Ok(()) } - pub async fn save(&self, kv: &K) -> Result<()> - where - K: KvStore, - { + pub async fn save(&self, kv: &dyn KvStore) -> Result { struct Frame<'a, T: Item> { node: &'a Node, children_saved: bool, @@ -861,7 +857,7 @@ where } } - Ok(()) + Ok(self.0.saved_id().clone()) } pub fn prune(&mut self, cx: &::Context, mut f: F) diff --git a/crates/crdb/src/btree/map.rs b/crates/crdb/src/btree/map.rs index 632a80caae..50e8e9afc5 100644 --- a/crates/crdb/src/btree/map.rs +++ b/crates/crdb/src/btree/map.rs @@ -56,6 +56,14 @@ where Ok(Self(Sequence::from_root(id, kv).await?)) } + pub async fn save(&self, kv: &dyn KvStore) -> Result + where + K: Serialize + for<'de> Deserialize<'de>, + V: Serialize + for<'de> Deserialize<'de>, + { + self.0.save(kv).await + } + pub fn is_empty(&self) -> bool { self.0.is_empty() } diff --git a/crates/crdb/src/crdb.rs b/crates/crdb/src/crdb.rs index 3ee4fb4dbd..f5355d1f67 100644 --- a/crates/crdb/src/crdb.rs +++ b/crates/crdb/src/crdb.rs @@ -213,6 +213,7 @@ struct Client { network: Arc, checkouts: Mutex>>, executor: Arc, + repo_snapshots: mpsc::UnboundedSender<(RepoId, RepoSnapshot)>, } struct Checkout { @@ -354,11 +355,13 @@ pub struct RoomCredentials { impl Client { pub fn new(executor: E, network: N, kv: Arc) -> Arc { Arc::new_cyclic(|weak_self: &Weak| { + let (repo_snapshots_tx, mut repo_snapshots_rx) = mpsc::unbounded(); let mut this = Self { db: Db::new(kv), network: Arc::new(network), checkouts: Default::default(), executor: Arc::new(executor), + repo_snapshots: repo_snapshots_tx, }; this.db.on_local_operation({ let this = weak_self.clone(); @@ -368,6 +371,29 @@ impl Client { } } }); + this.db.on_repo_snapshot_changed({ + let this = weak_self.clone(); + move |repo_id, snapshot| { + if let Some(this) = this.upgrade() { + let _ = this + .repo_snapshots + .unbounded_send((repo_id, snapshot.clone())); + } + } + }); + this.executor.spawn({ + let this = weak_self.clone(); + async move { + while let Some((repo_id, snapshot)) = repo_snapshots_rx.next().await { + if let Some(this) = this.upgrade() { + snapshot.save(repo_id, &*this.db.kv).await.log_err(); + } else { + break; + } + } + } + }); + this }) } @@ -680,6 +706,7 @@ pub struct Db { kv: Arc, snapshot: Arc>, local_operation_created: Option>, + repo_snapshot_changed: Option>, } impl Db { @@ -688,14 +715,19 @@ impl Db { kv, snapshot: Default::default(), local_operation_created: None, + repo_snapshot_changed: None, } } - fn on_local_operation( + fn on_local_operation(&mut self, callback: impl 'static + Send + Sync + Fn(RepoId, Operation)) { + self.local_operation_created = Some(Arc::new(callback)); + } + + fn on_repo_snapshot_changed( &mut self, - operation_created: impl 'static + Send + Sync + Fn(RepoId, Operation), + callback: impl 'static + Send + Sync + Fn(RepoId, &RepoSnapshot), ) { - self.local_operation_created = Some(Arc::new(operation_created)); + self.repo_snapshot_changed = Some(Arc::new(callback)); } fn repo(&self, id: RepoId) -> Option { @@ -788,6 +820,10 @@ impl Repo { } } + if let Some(repo_snapshot_changed) = self.db.repo_snapshot_changed.as_ref() { + repo_snapshot_changed(self.id, repo); + } + result }) .expect("repo must exist") @@ -1550,7 +1586,6 @@ struct SavedRepoSnapshot { last_operation_id: OperationId, branches: btree::SavedId, operations: btree::SavedId, - revisions: btree::SavedId, max_operation_ids: btree::SavedId, deferred_operations: btree::SavedId, } @@ -1595,6 +1630,20 @@ impl RepoSnapshot { }) } + async fn save(&self, id: RepoId, kv: &dyn KvStore) -> Result<()> { + let saved_repo = SavedRepoSnapshot { + last_operation_id: self.last_operation_id, + branches: self.branches.save(kv).await?, + operations: self.operations.save(kv).await?, + max_operation_ids: self.max_operation_ids.save(kv).await?, + deferred_operations: self.deferred_operations.save(kv).await?, + }; + let repo_bytes = serde_bare::to_vec(&saved_repo)?; + kv.store(btree::namespace_bytes("repo"), id.0.as_u128(), repo_bytes) + .await?; + Ok(()) + } + fn create_empty_branch(&mut self, name: impl Into>) -> (Operation, OperationId) { let name = name.into(); let branch_id = self.last_operation_id.tick();