Spawn a background task to save repo snapshots

This commit is contained in:
Antonio Scandurra
2023-07-28 13:08:33 +02:00
parent 3349f2147a
commit 4848ea8e62
3 changed files with 65 additions and 12 deletions

View File

@@ -711,15 +711,14 @@ where
Ok(Self(Arc::new(node)))
}
pub async fn load<F, K>(
pub async fn load<F>(
&mut self,
kv: &K,
kv: &dyn KvStore,
cx: &<T::Summary as Summary>::Context,
mut f: F,
) -> Result<()>
where
F: FnMut(Probe<T::Summary>) -> bool,
K: KvStore,
{
struct Frame<'a, T: Item> {
tree: &'a mut Sequence<T>,
@@ -773,10 +772,7 @@ where
Ok(())
}
pub async fn save<K>(&self, kv: &K) -> Result<()>
where
K: KvStore,
{
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedId> {
struct Frame<'a, T: Item> {
node: &'a Node<T>,
children_saved: bool,
@@ -861,7 +857,7 @@ where
}
}
Ok(())
Ok(self.0.saved_id().clone())
}
pub fn prune<F>(&mut self, cx: &<T::Summary as Summary>::Context, mut f: F)

View File

@@ -56,6 +56,14 @@ where
Ok(Self(Sequence::from_root(id, kv).await?))
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedId>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0.save(kv).await
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

View File

@@ -213,6 +213,7 @@ struct Client<E, N: ClientNetwork> {
network: Arc<N>,
checkouts: Mutex<HashMap<RepoId, Checkout<E, N>>>,
executor: Arc<E>,
repo_snapshots: mpsc::UnboundedSender<(RepoId, RepoSnapshot)>,
}
struct Checkout<E, N: ClientNetwork> {
@@ -354,11 +355,13 @@ pub struct RoomCredentials {
impl<E: Executor, N: ClientNetwork> Client<E, N> {
pub fn new(executor: E, network: N, kv: Arc<dyn KvStore>) -> Arc<Self> {
Arc::new_cyclic(|weak_self: &Weak<Self>| {
let (repo_snapshots_tx, mut repo_snapshots_rx) = mpsc::unbounded();
let mut this = Self {
db: Db::new(kv),
network: Arc::new(network),
checkouts: Default::default(),
executor: Arc::new(executor),
repo_snapshots: repo_snapshots_tx,
};
this.db.on_local_operation({
let this = weak_self.clone();
@@ -368,6 +371,29 @@ impl<E: Executor, N: ClientNetwork> Client<E, N> {
}
}
});
this.db.on_repo_snapshot_changed({
let this = weak_self.clone();
move |repo_id, snapshot| {
if let Some(this) = this.upgrade() {
let _ = this
.repo_snapshots
.unbounded_send((repo_id, snapshot.clone()));
}
}
});
this.executor.spawn({
let this = weak_self.clone();
async move {
while let Some((repo_id, snapshot)) = repo_snapshots_rx.next().await {
if let Some(this) = this.upgrade() {
snapshot.save(repo_id, &*this.db.kv).await.log_err();
} else {
break;
}
}
}
});
this
})
}
@@ -680,6 +706,7 @@ pub struct Db {
kv: Arc<dyn KvStore>,
snapshot: Arc<Mutex<DbSnapshot>>,
local_operation_created: Option<Arc<dyn Send + Sync + Fn(RepoId, Operation)>>,
repo_snapshot_changed: Option<Arc<dyn Send + Sync + Fn(RepoId, &RepoSnapshot)>>,
}
impl Db {
@@ -688,14 +715,19 @@ impl Db {
kv,
snapshot: Default::default(),
local_operation_created: None,
repo_snapshot_changed: None,
}
}
fn on_local_operation(
fn on_local_operation(&mut self, callback: impl 'static + Send + Sync + Fn(RepoId, Operation)) {
self.local_operation_created = Some(Arc::new(callback));
}
fn on_repo_snapshot_changed(
&mut self,
operation_created: impl 'static + Send + Sync + Fn(RepoId, Operation),
callback: impl 'static + Send + Sync + Fn(RepoId, &RepoSnapshot),
) {
self.local_operation_created = Some(Arc::new(operation_created));
self.repo_snapshot_changed = Some(Arc::new(callback));
}
fn repo(&self, id: RepoId) -> Option<Repo> {
@@ -788,6 +820,10 @@ impl Repo {
}
}
if let Some(repo_snapshot_changed) = self.db.repo_snapshot_changed.as_ref() {
repo_snapshot_changed(self.id, repo);
}
result
})
.expect("repo must exist")
@@ -1550,7 +1586,6 @@ struct SavedRepoSnapshot {
last_operation_id: OperationId,
branches: btree::SavedId,
operations: btree::SavedId,
revisions: btree::SavedId,
max_operation_ids: btree::SavedId,
deferred_operations: btree::SavedId,
}
@@ -1595,6 +1630,20 @@ impl RepoSnapshot {
})
}
async fn save(&self, id: RepoId, kv: &dyn KvStore) -> Result<()> {
let saved_repo = SavedRepoSnapshot {
last_operation_id: self.last_operation_id,
branches: self.branches.save(kv).await?,
operations: self.operations.save(kv).await?,
max_operation_ids: self.max_operation_ids.save(kv).await?,
deferred_operations: self.deferred_operations.save(kv).await?,
};
let repo_bytes = serde_bare::to_vec(&saved_repo)?;
kv.store(btree::namespace_bytes("repo"), id.0.as_u128(), repo_bytes)
.await?;
Ok(())
}
fn create_empty_branch(&mut self, name: impl Into<Arc<str>>) -> (Operation, OperationId) {
let name = name.into();
let branch_id = self.last_operation_id.tick();