Compare commits

...

1 Commits

Author SHA1 Message Date
Nathan Sobo
e316f657bc WIP: Start promoting crdb crate from old crdb branch
A lot has changed, so rather than merging, I tried to just pull it forward.
2024-07-03 14:09:56 -06:00
19 changed files with 10020 additions and 3 deletions

85
Cargo.lock generated
View File

@@ -306,6 +306,9 @@ name = "arrayvec"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
dependencies = [
"serde",
]
[[package]]
name = "as-raw-xcb-connection"
@@ -475,6 +478,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-broadcast"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d26004fe83b2d1cd3a97609b21e39f9a31535822210fe83205d2ce48866ea61"
dependencies = [
"event-listener 2.5.3",
"futures-core",
"parking_lot",
]
[[package]]
name = "async-broadcast"
version = "0.7.0"
@@ -1788,6 +1802,18 @@ dependencies = [
"workspace",
]
[[package]]
name = "bromberg_sl2"
version = "0.6.0"
source = "git+https://github.com/zed-industries/bromberg_sl2?rev=6faf816bd5b4b7b2b6ea77495686634732ded095#6faf816bd5b4b7b2b6ea77495686634732ded095"
dependencies = [
"digest 0.9.0",
"lazy_static",
"rayon",
"seq-macro",
"serde",
]
[[package]]
name = "bstr"
version = "1.6.2"
@@ -3058,6 +3084,33 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crdb"
version = "0.1.0"
dependencies = [
"anyhow",
"arrayvec",
"async-broadcast 0.4.1",
"bromberg_sl2",
"collections",
"ctor",
"env_logger",
"futures 0.3.28",
"gpui",
"lazy_static",
"log",
"parking_lot",
"portable-atomic",
"rand 0.8.5",
"serde",
"serde_bare",
"serde_json",
"smallvec",
"smol",
"util",
"uuid",
]
[[package]]
name = "criterion"
version = "0.4.0"
@@ -6245,7 +6298,7 @@ name = "live_kit_client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-broadcast",
"async-broadcast 0.7.0",
"async-trait",
"collections",
"core-foundation",
@@ -7888,6 +7941,15 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7"
[[package]]
name = "portable-atomic"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
dependencies = [
"serde",
]
[[package]]
name = "postage"
version = "0.5.0"
@@ -9490,6 +9552,12 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
[[package]]
name = "seq-macro"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9f47faea3cad316faa914d013d24f471cd90bfca1a0c70f05a3f42c6441e99"
[[package]]
name = "serde"
version = "1.0.202"
@@ -9499,6 +9567,15 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_bare"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51c55386eed0f1ae957b091dc2ca8122f287b60c79c774cbe3d5f2b69fded660"
dependencies = [
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.202"
@@ -9876,6 +9953,9 @@ name = "smallvec"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a"
dependencies = [
"serde",
]
[[package]]
name = "smol"
@@ -11926,6 +12006,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"getrandom 0.2.10",
"rand 0.8.5",
"serde",
"sha1_smol",
]
@@ -13547,7 +13628,7 @@ version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b8e3d6ae3342792a6cc2340e4394334c7402f3d793b390d2c5494a4032b3030"
dependencies = [
"async-broadcast",
"async-broadcast 0.7.0",
"async-executor",
"async-fs 2.1.1",
"async-io 2.3.1",

View File

@@ -20,6 +20,7 @@ members = [
"crates/command_palette",
"crates/command_palette_hooks",
"crates/copilot",
"crates/crdb",
"crates/db",
"crates/dev_server_projects",
"crates/diagnostics",
@@ -359,7 +360,7 @@ shellexpand = "2.1.0"
shlex = "1.3.0"
signal-hook = "0.3.17"
similar = "1.3"
smallvec = { version = "1.6", features = ["union"] }
smallvec = { version = "1.6", features = ["union", "serde"] }
smol = "1.2"
strum = { version = "0.25.0", features = ["derive"] }
subtle = "2.5.0"

40
crates/crdb/Cargo.toml Normal file
View File

@@ -0,0 +1,40 @@
[package]
name = "crdb"
version = "0.1.0"
edition = "2021"
[lib]
path = "src/crdb.rs"
doctest = false
[features]
test-support = ["collections/test-support", "util/test-support"]
[dependencies]
collections = { path = "../collections" }
util = { path = "../util" }
anyhow.workspace = true
arrayvec = { version = "0.7.1", features = ["serde"] }
bromberg_sl2 = { git = "https://github.com/zed-industries/bromberg_sl2", rev = "6faf816bd5b4b7b2b6ea77495686634732ded095" }
futures.workspace = true
lazy_static.workspace = true
log.workspace = true
parking_lot.workspace = true
portable-atomic = { version = "1", features = ["serde"] }
serde.workspace = true
serde_json.workspace = true
serde_bare = "0.5"
smallvec.workspace = true
uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }
[dev-dependencies]
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
util = { path = "../util", features = ["test-support"] }
async-broadcast = "0.4"
ctor.workspace = true
env_logger.workspace = true
rand.workspace = true
smol.workspace = true

35
crates/crdb/src/README.md Normal file
View File

@@ -0,0 +1,35 @@
# CRDB: A conflict-free replicated database for code and markdown
Our goal is for this database to contain all the text inserted in Zed.
## Contexts
The database is divided into *contexts*, with each context containing a collection of *documents*.
### Contexts contain documents
These contexts and the documents are really just namespaces in a global table of document *fragments*. Each fragment is a sequence of one or more characters, which may or may not be visible in a given branch.
#### Documents with paths are files
Documents in a context can be associated with metadata. If a document is associated with a relative path, it represents a file. A context that contains files can be synchronized with a directory tree on the file system, much like a Git repository.
#### Conversations are also documents
Contexts can also be associated with conversations, which are special documents that embed other documents that represent messages. Messages are embedded via a mechanism called *portals*, which will be discussed further below.
### Contexts occupy a hierarchical namespace
For example, at genesis, zed.dev will contain the following channels:
#zed
- This is where people get oriented about what Zed is all about. We'll link to it from our landing page.
#zed/staff
- Here's where we talk about stuff private to the company, and host company-specific files.
#zed/insiders
- Users we've worked with.
#zed/zed
- This contains the actual source code for Zed.
- It also has a conversation where potential contributors can engage with us and each other.
#zed/zed/debugger
- A subcontext of zed/zed where we talk about and eventually implement a debugger. Associated with a different branch of zed/zed where the debugger is being built, but could also have multiple branches. Branches and contexts are independent.

1957
crates/crdb/src/btree.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,755 @@
use super::*;
use arrayvec::ArrayVec;
use std::{cmp::Ordering, mem, sync::Arc};
#[derive(Clone)]
struct StackEntry<'a, T: Item, D> {
tree: &'a Sequence<T>,
index: usize,
position: D,
}
#[derive(Clone)]
pub struct Cursor<'a, T: Item, D> {
tree: &'a Sequence<T>,
stack: ArrayVec<StackEntry<'a, T, D>, 16>,
position: D,
did_seek: bool,
at_end: bool,
}
pub struct Iter<'a, T: Item> {
tree: &'a Sequence<T>,
stack: ArrayVec<StackEntry<'a, T, ()>, 16>,
}
impl<'a, T, D> Cursor<'a, T, D>
where
T: Item,
D: Dimension<'a, T::Summary>,
{
pub fn new(tree: &'a Sequence<T>) -> Self {
Self {
tree,
stack: ArrayVec::new(),
position: D::default(),
did_seek: false,
at_end: tree.is_empty(),
}
}
fn reset(&mut self) {
self.did_seek = false;
self.at_end = self.tree.is_empty();
self.stack.truncate(0);
self.position = D::default();
}
pub fn start(&self) -> &D {
&self.position
}
pub fn end(&self, cx: &<T::Summary as Summary>::Context) -> D {
if let Some(item_summary) = self.item_summary() {
let mut end = self.start().clone();
end.add_summary(item_summary, cx);
end
} else {
self.start().clone()
}
}
pub fn item(&self) -> Option<&'a T> {
self.assert_did_seek();
if let Some(entry) = self.stack.last() {
match *entry.tree.0 {
Node::Leaf { ref items, .. } => {
if entry.index == items.len() {
None
} else {
Some(&items[entry.index])
}
}
_ => unreachable!(),
}
} else {
None
}
}
pub fn item_summary(&self) -> Option<&'a T::Summary> {
self.assert_did_seek();
if let Some(entry) = self.stack.last() {
match *entry.tree.0 {
Node::Leaf {
ref item_summaries, ..
} => {
if entry.index == item_summaries.len() {
None
} else {
Some(&item_summaries[entry.index])
}
}
_ => unreachable!(),
}
} else {
None
}
}
pub fn prev_item(&self) -> Option<&'a T> {
self.assert_did_seek();
if let Some(entry) = self.stack.last() {
if entry.index == 0 {
if let Some(prev_leaf) = self.prev_leaf() {
Some(prev_leaf.0.items().last().unwrap())
} else {
None
}
} else {
match *entry.tree.0 {
Node::Leaf { ref items, .. } => Some(&items[entry.index - 1]),
_ => unreachable!(),
}
}
} else if self.at_end {
self.tree.last()
} else {
None
}
}
fn prev_leaf(&self) -> Option<&'a Sequence<T>> {
for entry in self.stack.iter().rev().skip(1) {
if entry.index != 0 {
match *entry.tree.0 {
Node::Internal {
ref child_trees, ..
} => {
for tree in child_trees[..entry.index].iter().rev() {
if let ChildTree::Loaded { tree } = tree {
if let Some(leaf) = tree.rightmost_leaf() {
return Some(leaf);
}
}
}
}
Node::Leaf { .. } => unreachable!(),
};
}
}
None
}
pub fn prev(&mut self, cx: &<T::Summary as Summary>::Context) {
self.prev_internal(|_| true, cx)
}
fn prev_internal<F>(&mut self, mut filter_node: F, cx: &<T::Summary as Summary>::Context)
where
F: FnMut(&T::Summary) -> bool,
{
if !self.did_seek {
self.did_seek = true;
self.at_end = true;
}
if self.at_end {
self.position = D::default();
self.at_end = self.tree.is_empty();
if !self.tree.is_empty() {
self.stack.push(StackEntry {
tree: self.tree,
index: self.tree.0.child_summaries().len(),
position: D::from_summary(self.tree.summary(), cx),
});
}
}
let mut descending = false;
while !self.stack.is_empty() {
if let Some(StackEntry { position, .. }) = self.stack.iter().rev().nth(1) {
self.position = position.clone();
} else {
self.position = D::default();
}
let entry = self.stack.last_mut().unwrap();
if !descending {
if entry.index == 0 {
self.stack.pop();
continue;
} else {
entry.index -= 1;
}
}
for summary in &entry.tree.0.child_summaries()[..entry.index] {
self.position.add_summary(summary, cx);
}
entry.position = self.position.clone();
descending = filter_node(&entry.tree.0.child_summaries()[entry.index]);
match entry.tree.0.as_ref() {
Node::Internal { child_trees, .. } => {
if descending {
if let ChildTree::Loaded { tree } = &child_trees[entry.index] {
self.stack.push(StackEntry {
position: D::default(),
tree,
index: tree.0.child_summaries().len() - 1,
});
} else {
descending = false;
}
}
}
Node::Leaf { .. } => {
if descending {
break;
}
}
}
}
}
pub fn next(&mut self, cx: &<T::Summary as Summary>::Context) {
self.next_internal(|_| true, cx)
}
fn next_internal<F>(&mut self, mut filter_node: F, cx: &<T::Summary as Summary>::Context)
where
F: FnMut(&T::Summary) -> bool,
{
let mut descend = false;
if self.stack.is_empty() {
if !self.at_end {
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
position: D::default(),
});
descend = true;
}
self.did_seek = true;
}
while !self.stack.is_empty() {
let new_subtree = {
let entry = self.stack.last_mut().unwrap();
match entry.tree.0.as_ref() {
Node::Internal {
child_trees,
child_summaries,
..
} => {
if !descend {
entry.index += 1;
entry.position = self.position.clone();
}
while entry.index < child_summaries.len() {
let next_summary = &child_summaries[entry.index];
if filter_node(next_summary) && child_trees[entry.index].is_loaded() {
break;
} else {
entry.index += 1;
entry.position.add_summary(next_summary, cx);
self.position.add_summary(next_summary, cx);
}
}
child_trees.get(entry.index)
}
Node::Leaf { item_summaries, .. } => {
if !descend {
let item_summary = &item_summaries[entry.index];
entry.index += 1;
entry.position.add_summary(item_summary, cx);
self.position.add_summary(item_summary, cx);
}
loop {
if let Some(next_item_summary) = item_summaries.get(entry.index) {
if filter_node(next_item_summary) {
return;
} else {
entry.index += 1;
entry.position.add_summary(next_item_summary, cx);
self.position.add_summary(next_item_summary, cx);
}
} else {
break None;
}
}
}
}
};
if let Some(subtree) = new_subtree {
let subtree = if let ChildTree::Loaded { tree } = subtree {
tree
} else {
unreachable!()
};
descend = true;
self.stack.push(StackEntry {
tree: subtree,
index: 0,
position: self.position.clone(),
});
} else {
descend = false;
self.stack.pop();
}
}
self.at_end = self.stack.is_empty();
debug_assert!(self.stack.is_empty() || self.stack.last().unwrap().tree.0.is_leaf());
}
fn assert_did_seek(&self) {
assert!(
self.did_seek,
"Must call `seek`, `next` or `prev` before calling this method"
);
}
}
impl<'a, T, D> Cursor<'a, T, D>
where
T: Item,
D: Dimension<'a, T::Summary>,
{
pub fn seek<Target>(
&mut self,
pos: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> bool
where
Target: SeekTarget<'a, T::Summary, D>,
{
self.reset();
self.seek_internal(pos, bias, &mut (), cx)
}
pub fn seek_forward<Target>(
&mut self,
pos: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> bool
where
Target: SeekTarget<'a, T::Summary, D>,
{
self.seek_internal(pos, bias, &mut (), cx)
}
pub fn slice<Target>(
&mut self,
end: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> Sequence<T>
where
Target: SeekTarget<'a, T::Summary, D>,
{
let mut slice = SliceSeekAggregate {
tree: Sequence::new(),
leaf_items: ArrayVec::new(),
leaf_item_summaries: ArrayVec::new(),
leaf_summary: T::Summary::default(),
};
self.seek_internal(end, bias, &mut slice, cx);
slice.tree
}
pub fn suffix(&mut self, cx: &<T::Summary as Summary>::Context) -> Sequence<T> {
self.slice(&End::new(), Bias::Right, cx)
}
pub fn summary<Target, Output>(
&mut self,
end: &Target,
bias: Bias,
cx: &<T::Summary as Summary>::Context,
) -> Output
where
Target: SeekTarget<'a, T::Summary, D>,
Output: Dimension<'a, T::Summary>,
{
let mut summary = SummarySeekAggregate(Output::default());
self.seek_internal(end, bias, &mut summary, cx);
summary.0
}
fn seek_internal(
&mut self,
target: &dyn SeekTarget<'a, T::Summary, D>,
bias: Bias,
aggregate: &mut dyn SeekAggregate<'a, T>,
cx: &<T::Summary as Summary>::Context,
) -> bool {
debug_assert!(
target.seek_cmp(&self.position, cx) >= Ordering::Equal,
"cannot seek backward from {:?} to {:?}",
self.position,
target
);
if !self.did_seek {
self.did_seek = true;
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
position: Default::default(),
});
}
let mut ascending = false;
'outer: while let Some(entry) = self.stack.last_mut() {
match *entry.tree.0 {
Node::Internal {
ref child_summaries,
ref child_trees,
..
} => {
if ascending {
entry.index += 1;
entry.position = self.position.clone();
}
for (child_tree, child_summary) in child_trees[entry.index..]
.iter()
.zip(&child_summaries[entry.index..])
{
let mut child_end = self.position.clone();
child_end.add_summary(child_summary, cx);
let comparison = target.seek_cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == Bias::Right)
|| !child_tree.is_loaded()
{
self.position = child_end;
aggregate.push_tree(child_tree, child_summary, cx);
entry.index += 1;
entry.position = self.position.clone();
} else {
let child_tree = if let ChildTree::Loaded { tree } = child_tree {
tree
} else {
unreachable!()
};
self.stack.push(StackEntry {
tree: child_tree,
index: 0,
position: self.position.clone(),
});
ascending = false;
continue 'outer;
}
}
}
Node::Leaf {
ref items,
ref item_summaries,
..
} => {
aggregate.begin_leaf();
for (item, item_summary) in items[entry.index..]
.iter()
.zip(&item_summaries[entry.index..])
{
let mut child_end = self.position.clone();
child_end.add_summary(item_summary, cx);
let comparison = target.seek_cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == Bias::Right)
{
self.position = child_end;
aggregate.push_item(item, item_summary, cx);
entry.index += 1;
} else {
aggregate.end_leaf(cx);
break 'outer;
}
}
aggregate.end_leaf(cx);
}
}
self.stack.pop();
ascending = true;
}
self.at_end = self.stack.is_empty();
debug_assert!(self.stack.is_empty() || self.stack.last().unwrap().tree.0.is_leaf());
let mut end = self.position.clone();
if bias == Bias::Left {
if let Some(summary) = self.item_summary() {
end.add_summary(summary, cx);
}
}
target.seek_cmp(&end, cx) == Ordering::Equal
}
}
impl<'a, T: Item> Iter<'a, T> {
pub(crate) fn new(tree: &'a Sequence<T>) -> Self {
Self {
tree,
stack: Default::default(),
}
}
}
impl<'a, T: Item> Iterator for Iter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
let mut descend = false;
if self.stack.is_empty() {
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
position: (),
});
descend = true;
}
while !self.stack.is_empty() {
let new_subtree = {
let entry = self.stack.last_mut().unwrap();
match entry.tree.0.as_ref() {
Node::Internal { child_trees, .. } => {
if !descend {
entry.index += 1;
}
while entry.index < child_trees.len() {
if child_trees[entry.index].is_loaded() {
break;
}
entry.index += 1;
}
child_trees.get(entry.index)
}
Node::Leaf { items, .. } => {
if !descend {
entry.index += 1;
}
if let Some(next_item) = items.get(entry.index) {
return Some(next_item);
} else {
None
}
}
}
};
if let Some(subtree) = new_subtree {
let subtree = if let ChildTree::Loaded { tree } = subtree {
tree
} else {
unreachable!()
};
descend = true;
self.stack.push(StackEntry {
tree: subtree,
index: 0,
position: (),
});
} else {
descend = false;
self.stack.pop();
}
}
None
}
}
impl<'a, T, S, D> Iterator for Cursor<'a, T, D>
where
T: Item<Summary = S>,
S: Summary<Context = ()>,
D: Dimension<'a, T::Summary>,
{
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if !self.did_seek {
self.next(&());
}
if let Some(item) = self.item() {
self.next(&());
Some(item)
} else {
None
}
}
}
pub struct FilterCursor<'a, F, T: Item, D> {
cursor: Cursor<'a, T, D>,
filter_node: F,
}
impl<'a, F, T, D> FilterCursor<'a, F, T, D>
where
F: FnMut(&T::Summary) -> bool,
T: Item,
D: Dimension<'a, T::Summary>,
{
pub fn new(tree: &'a Sequence<T>, filter_node: F) -> Self {
let cursor = tree.cursor::<D>();
Self {
cursor,
filter_node,
}
}
pub fn start(&self) -> &D {
self.cursor.start()
}
pub fn end(&self, cx: &<T::Summary as Summary>::Context) -> D {
self.cursor.end(cx)
}
pub fn item(&self) -> Option<&'a T> {
self.cursor.item()
}
pub fn item_summary(&self) -> Option<&'a T::Summary> {
self.cursor.item_summary()
}
pub fn next(&mut self, cx: &<T::Summary as Summary>::Context) {
self.cursor.next_internal(&mut self.filter_node, cx);
}
pub fn prev(&mut self, cx: &<T::Summary as Summary>::Context) {
self.cursor.prev_internal(&mut self.filter_node, cx);
}
}
impl<'a, F, T, S, U> Iterator for FilterCursor<'a, F, T, U>
where
F: FnMut(&T::Summary) -> bool,
T: Item<Summary = S>,
S: Summary<Context = ()>, //Context for the summary must be unit type, as .next() doesn't take arguments
U: Dimension<'a, T::Summary>,
{
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if !self.cursor.did_seek {
self.next(&());
}
if let Some(item) = self.item() {
self.cursor.next_internal(&mut self.filter_node, &());
Some(item)
} else {
None
}
}
}
trait SeekAggregate<'a, T: Item> {
fn begin_leaf(&mut self);
fn end_leaf(&mut self, cx: &<T::Summary as Summary>::Context);
fn push_item(
&mut self,
item: &'a T,
summary: &'a T::Summary,
cx: &<T::Summary as Summary>::Context,
);
fn push_tree(
&mut self,
tree: &'a ChildTree<T>,
summary: &'a T::Summary,
cx: &<T::Summary as Summary>::Context,
);
}
struct SliceSeekAggregate<T: Item> {
tree: Sequence<T>,
leaf_items: ArrayVec<T, { 2 * TREE_BASE }>,
leaf_item_summaries: ArrayVec<T::Summary, { 2 * TREE_BASE }>,
leaf_summary: T::Summary,
}
struct SummarySeekAggregate<D>(D);
impl<'a, T: Item> SeekAggregate<'a, T> for () {
fn begin_leaf(&mut self) {}
fn end_leaf(&mut self, _: &<T::Summary as Summary>::Context) {}
fn push_item(&mut self, _: &T, _: &T::Summary, _: &<T::Summary as Summary>::Context) {}
fn push_tree(
&mut self,
_: &ChildTree<T>,
_: &T::Summary,
_: &<T::Summary as Summary>::Context,
) {
}
}
impl<'a, T: Item> SeekAggregate<'a, T> for SliceSeekAggregate<T> {
fn begin_leaf(&mut self) {}
fn end_leaf(&mut self, cx: &<T::Summary as Summary>::Context) {
self.tree.append(
Sequence(Arc::new(Node::Leaf {
saved_id: SavedId::default(),
summary: mem::take(&mut self.leaf_summary),
items: mem::take(&mut self.leaf_items),
item_summaries: mem::take(&mut self.leaf_item_summaries),
})),
cx,
);
}
fn push_item(&mut self, item: &T, summary: &T::Summary, cx: &<T::Summary as Summary>::Context) {
self.leaf_items.push(item.clone());
self.leaf_item_summaries.push(summary.clone());
Summary::add_summary(&mut self.leaf_summary, summary, cx);
}
fn push_tree(
&mut self,
tree: &ChildTree<T>,
summary: &T::Summary,
cx: &<T::Summary as Summary>::Context,
) {
self.tree.append_internal(tree.clone(), summary.clone(), cx);
}
}
impl<'a, T: Item, D> SeekAggregate<'a, T> for SummarySeekAggregate<D>
where
D: Dimension<'a, T::Summary>,
{
fn begin_leaf(&mut self) {}
fn end_leaf(&mut self, _: &<T::Summary as Summary>::Context) {}
fn push_item(&mut self, _: &T, summary: &'a T::Summary, cx: &<T::Summary as Summary>::Context) {
self.0.add_summary(summary, cx);
}
fn push_tree(
&mut self,
_: &ChildTree<T>,
summary: &'a T::Summary,
cx: &<T::Summary as Summary>::Context,
) {
self.0.add_summary(summary, cx);
}
}

View File

@@ -0,0 +1,594 @@
use super::{
Bias, Dimension, Edit, Item, KeyedItem, KvStore, SavedId, SeekTarget, Sequence, Summary,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::{self, Debug},
ops::{Bound, RangeBounds},
};
#[derive(Clone, PartialEq, Eq)]
pub struct Map<K, V>(Sequence<MapEntry<K, V>>)
where
K: Clone + Debug + Ord,
V: Clone + Debug;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MapEntry<K, V> {
key: K,
value: V,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MapKey<K>(Option<K>);
impl<K> Default for MapKey<K> {
fn default() -> Self {
Self(None)
}
}
#[derive(Clone, Debug)]
pub struct MapKeyRef<'a, K>(Option<&'a K>);
impl<K> Default for MapKeyRef<'_, K> {
fn default() -> Self {
Self(None)
}
}
#[derive(Clone)]
pub struct Set<K>(Map<K, ()>)
where
K: Clone + Debug + Ord;
impl<K, V> Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
Sequence::ptr_eq(&this.0, &other.0)
}
pub fn from_ordered_entries(entries: impl IntoIterator<Item = (K, V)>) -> Self {
let tree = Sequence::from_iter(
entries
.into_iter()
.map(|(key, value)| MapEntry { key, value }),
&(),
);
Self(tree)
}
pub async fn load_root(id: SavedId, kv: &dyn KvStore) -> Result<Self>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
Ok(Self(Sequence::load_root(id, kv).await?))
}
pub async fn load_all(id: SavedId, kv: &dyn KvStore) -> Result<Self>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
let mut sequence = Sequence::load_root(id, kv).await?;
sequence.load(kv, &(), |_| true).await?;
Ok(Self(sequence))
}
pub async fn load(&mut self, key: &K, kv: &dyn KvStore) -> Result<Option<&V>>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(probe.start.0.as_ref()),
Bound::Included(probe.summary.0.as_ref()),
);
key_range.contains(&Some(key))
})
.await?;
Ok(self.get(key))
}
pub async fn load_from(
&mut self,
start: &K,
kv: &dyn KvStore,
) -> Result<impl Iterator<Item = (&K, &V)>>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
probe.start.0.as_ref() >= Some(&start) || probe.summary.0.as_ref() >= Some(&start)
})
.await?;
Ok(self.iter_from(start))
}
pub async fn store(&mut self, key: K, value: V, kv: &dyn KvStore) -> Result<()>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(probe.start.0.as_ref()),
Bound::Included(probe.summary.0.as_ref()),
);
key_range.contains(&Some(&key))
})
.await?;
self.insert(key, value);
Ok(())
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedId>
where
K: Serialize + for<'de> Deserialize<'de>,
V: Serialize + for<'de> Deserialize<'de>,
{
self.0.save(kv).await
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn get<'a>(&self, key: &'a K) -> Option<&V> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
cursor.seek(&MapKeyRef(Some(key)), Bias::Left, &());
if let Some(item) = cursor.item() {
if key == &item.key {
Some(&item.value)
} else {
None
}
} else {
None
}
}
pub fn contains_key<'a>(&self, key: &'a K) -> bool {
self.get(key).is_some()
}
pub fn insert(&mut self, key: K, value: V) {
self.0.insert_or_replace(MapEntry { key, value }, &());
}
pub fn remove(&mut self, key: &K) -> Option<V> {
let mut removed = None;
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let key = MapKeyRef(Some(key));
let mut new_tree = cursor.slice(&key, Bias::Left, &());
if key.seek_cmp(&cursor.end(&()), &()) == Ordering::Equal {
removed = Some(cursor.item().unwrap().value.clone());
cursor.next(&());
}
new_tree.append(cursor.suffix(&()), &());
drop(cursor);
self.0 = new_tree;
removed
}
pub fn remove_range(&mut self, start: &impl MapSeekTarget<K>, end: &impl MapSeekTarget<K>) {
let start = MapSeekTargetAdaptor(start);
let end = MapSeekTargetAdaptor(end);
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let mut new_tree = cursor.slice(&start, Bias::Left, &());
cursor.seek(&end, Bias::Left, &());
new_tree.append(cursor.suffix(&()), &());
drop(cursor);
self.0 = new_tree;
}
/// Returns the key-value pair with the greatest key less than or equal to the given key.
pub fn closest(&self, key: &K) -> Option<(&K, &V)> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let key = MapKeyRef(Some(key));
cursor.seek(&key, Bias::Right, &());
cursor.prev(&());
cursor.item().map(|item| (&item.key, &item.value))
}
pub fn iter_from<'a>(&self, from: &'a K) -> impl Iterator<Item = (&K, &V)> {
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let from_key = MapKeyRef(Some(from));
cursor.seek(&from_key, Bias::Left, &());
cursor
.into_iter()
.map(|map_entry| (&map_entry.key, &map_entry.value))
}
pub fn update<F, T>(&mut self, key: &K, f: F) -> Option<T>
where
F: FnOnce(&mut V) -> T,
{
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
let key = MapKeyRef(Some(key));
let mut new_tree = cursor.slice(&key, Bias::Left, &());
let mut result = None;
if key.seek_cmp(&cursor.end(&()), &()) == Ordering::Equal {
let mut updated = cursor.item().unwrap().clone();
result = Some(f(&mut updated.value));
new_tree.push(updated, &());
cursor.next(&());
}
new_tree.append(cursor.suffix(&()), &());
drop(cursor);
self.0 = new_tree;
result
}
pub fn retain<F: FnMut(&K, &V) -> bool>(&mut self, mut predicate: F) {
let mut new_map = Sequence::<MapEntry<K, V>>::default();
let mut cursor = self.0.cursor::<MapKeyRef<'_, K>>();
cursor.next(&());
while let Some(item) = cursor.item() {
if predicate(&item.key, &item.value) {
new_map.push(item.clone(), &());
}
cursor.next(&());
}
drop(cursor);
self.0 = new_map;
}
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> + '_ {
self.0.iter().map(|entry| (&entry.key, &entry.value))
}
pub fn values(&self) -> impl Iterator<Item = &V> + '_ {
self.0.iter().map(|entry| &entry.value)
}
pub fn insert_tree(&mut self, other: Map<K, V>) {
let edits = other
.iter()
.map(|(key, value)| {
Edit::Insert(MapEntry {
key: key.to_owned(),
value: value.to_owned(),
})
})
.collect();
self.0.edit(edits, &());
}
}
impl<K, V> Into<BTreeMap<K, V>> for &Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn into(self) -> BTreeMap<K, V> {
self.iter()
.map(|(replica_id, count)| (replica_id.clone(), count.clone()))
.collect()
}
}
impl<K, V> From<&BTreeMap<K, V>> for Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn from(value: &BTreeMap<K, V>) -> Self {
Map::from_ordered_entries(value.into_iter().map(|(k, v)| (k.clone(), v.clone())))
}
}
impl<K, V> Debug for Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map().entries(self.iter()).finish()
}
}
impl<T> Debug for Set<T>
where
T: Clone + Debug + Ord,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_set().entries(self.iter()).finish()
}
}
#[derive(Debug)]
struct MapSeekTargetAdaptor<'a, T>(&'a T);
impl<'a, K: Debug + Clone + Ord, T: MapSeekTarget<K>> SeekTarget<'a, MapKey<K>, MapKeyRef<'a, K>>
for MapSeekTargetAdaptor<'_, T>
{
fn seek_cmp(&self, cursor_location: &MapKeyRef<K>, _: &()) -> Ordering {
if let Some(key) = &cursor_location.0 {
MapSeekTarget::cmp_cursor(self.0, key)
} else {
Ordering::Greater
}
}
}
pub trait MapSeekTarget<K>: Debug {
fn cmp_cursor(&self, cursor_location: &K) -> Ordering;
}
impl<K: Debug + Ord> MapSeekTarget<K> for K {
fn cmp_cursor(&self, cursor_location: &K) -> Ordering {
self.cmp(cursor_location)
}
}
impl<K, V> Default for Map<K, V>
where
K: Clone + Debug + Ord,
V: Clone + Debug,
{
fn default() -> Self {
Self(Default::default())
}
}
impl<K, V> Item for MapEntry<K, V>
where
K: Clone + Debug + Ord,
V: Clone,
{
type Summary = MapKey<K>;
fn summary(&self) -> Self::Summary {
self.key()
}
}
impl<K, V> KeyedItem for MapEntry<K, V>
where
K: Clone + Debug + Ord,
V: Clone,
{
type Key = MapKey<K>;
fn key(&self) -> Self::Key {
MapKey(Some(self.key.clone()))
}
}
impl<K> Summary for MapKey<K>
where
K: Clone + Debug,
{
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
*self = summary.clone()
}
}
impl<'a, K> Dimension<'a, MapKey<K>> for MapKeyRef<'a, K>
where
K: Clone + Debug + Ord,
{
fn add_summary(&mut self, summary: &'a MapKey<K>, _: &()) {
self.0 = summary.0.as_ref();
}
}
impl<'a, K> SeekTarget<'a, MapKey<K>, MapKeyRef<'a, K>> for MapKeyRef<'_, K>
where
K: Clone + Debug + Ord,
{
fn seek_cmp(&self, cursor_location: &MapKeyRef<K>, _: &()) -> Ordering {
Ord::cmp(&self.0, &cursor_location.0)
}
}
impl<K> Default for Set<K>
where
K: Clone + Debug + Ord,
{
fn default() -> Self {
Self(Default::default())
}
}
impl<K> Set<K>
where
K: Clone + Debug + Ord,
{
pub fn from_ordered_entries(entries: impl IntoIterator<Item = K>) -> Self {
Self(Map::from_ordered_entries(
entries.into_iter().map(|key| (key, ())),
))
}
pub fn insert(&mut self, key: K) {
self.0.insert(key, ());
}
pub fn contains(&self, key: &K) -> bool {
self.0.get(key).is_some()
}
pub fn iter(&self) -> impl Iterator<Item = &K> + '_ {
self.0.iter().map(|(k, _)| k)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic() {
let mut map = Map::default();
assert_eq!(map.iter().collect::<Vec<_>>(), vec![]);
map.insert(3, "c");
assert_eq!(map.get(&3), Some(&"c"));
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&3, &"c")]);
map.insert(1, "a");
assert_eq!(map.get(&1), Some(&"a"));
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&1, &"a"), (&3, &"c")]);
map.insert(2, "b");
assert_eq!(map.get(&2), Some(&"b"));
assert_eq!(map.get(&1), Some(&"a"));
assert_eq!(map.get(&3), Some(&"c"));
assert_eq!(
map.iter().collect::<Vec<_>>(),
vec![(&1, &"a"), (&2, &"b"), (&3, &"c")]
);
assert_eq!(map.closest(&0), None);
assert_eq!(map.closest(&1), Some((&1, &"a")));
assert_eq!(map.closest(&10), Some((&3, &"c")));
map.remove(&2);
assert_eq!(map.get(&2), None);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&1, &"a"), (&3, &"c")]);
assert_eq!(map.closest(&2), Some((&1, &"a")));
map.remove(&3);
assert_eq!(map.get(&3), None);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&1, &"a")]);
map.remove(&1);
assert_eq!(map.get(&1), None);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![]);
map.insert(4, "d");
map.insert(5, "e");
map.insert(6, "f");
map.retain(|key, _| *key % 2 == 0);
assert_eq!(map.iter().collect::<Vec<_>>(), vec![(&4, &"d"), (&6, &"f")]);
}
#[test]
fn test_iter_from() {
let mut map = Map::default();
map.insert("a", 1);
map.insert("b", 2);
map.insert("baa", 3);
map.insert("baaab", 4);
map.insert("c", 5);
let result = map
.iter_from(&"ba")
.take_while(|(key, _)| key.starts_with(&"ba"))
.collect::<Vec<_>>();
assert_eq!(result.len(), 2);
assert!(result.iter().find(|(k, _)| k == &&"baa").is_some());
assert!(result.iter().find(|(k, _)| k == &&"baaab").is_some());
let result = map
.iter_from(&"c")
.take_while(|(key, _)| key.starts_with(&"c"))
.collect::<Vec<_>>();
assert_eq!(result.len(), 1);
assert!(result.iter().find(|(k, _)| k == &&"c").is_some());
}
#[test]
fn test_insert_tree() {
let mut map = Map::default();
map.insert("a", 1);
map.insert("b", 2);
map.insert("c", 3);
let mut other = Map::default();
other.insert("a", 2);
other.insert("b", 2);
other.insert("d", 4);
map.insert_tree(other);
assert_eq!(map.iter().count(), 4);
assert_eq!(map.get(&"a"), Some(&2));
assert_eq!(map.get(&"b"), Some(&2));
assert_eq!(map.get(&"c"), Some(&3));
assert_eq!(map.get(&"d"), Some(&4));
}
#[test]
fn test_remove_between_and_path_successor() {
use std::path::{Path, PathBuf};
#[derive(Debug)]
pub struct PathDescendants<'a>(&'a Path);
impl MapSeekTarget<PathBuf> for PathDescendants<'_> {
fn cmp_cursor(&self, key: &PathBuf) -> Ordering {
if key.starts_with(&self.0) {
Ordering::Greater
} else {
self.0.cmp(key)
}
}
}
let mut map = Map::default();
map.insert(PathBuf::from("a"), 1);
map.insert(PathBuf::from("a/a"), 1);
map.insert(PathBuf::from("b"), 2);
map.insert(PathBuf::from("b/a/a"), 3);
map.insert(PathBuf::from("b/a/a/a/b"), 4);
map.insert(PathBuf::from("c"), 5);
map.insert(PathBuf::from("c/a"), 6);
map.remove_range(
&PathBuf::from("b/a"),
&PathDescendants(&PathBuf::from("b/a")),
);
assert_eq!(map.get(&PathBuf::from("a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("a/a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("b")), Some(&2));
assert_eq!(map.get(&PathBuf::from("b/a/a")), None);
assert_eq!(map.get(&PathBuf::from("b/a/a/a/b")), None);
assert_eq!(map.get(&PathBuf::from("c")), Some(&5));
assert_eq!(map.get(&PathBuf::from("c/a")), Some(&6));
map.remove_range(&PathBuf::from("c"), &PathDescendants(&PathBuf::from("c")));
assert_eq!(map.get(&PathBuf::from("a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("a/a")), Some(&1));
assert_eq!(map.get(&PathBuf::from("b")), Some(&2));
assert_eq!(map.get(&PathBuf::from("c")), None);
assert_eq!(map.get(&PathBuf::from("c/a")), None);
map.remove_range(&PathBuf::from("a"), &PathDescendants(&PathBuf::from("a")));
assert_eq!(map.get(&PathBuf::from("a")), None);
assert_eq!(map.get(&PathBuf::from("a/a")), None);
assert_eq!(map.get(&PathBuf::from("b")), Some(&2));
map.remove_range(&PathBuf::from("b"), &PathDescendants(&PathBuf::from("b")));
assert_eq!(map.get(&PathBuf::from("b")), None);
}
}

2840
crates/crdb/src/crdb.rs Normal file

File diff suppressed because it is too large Load Diff

127
crates/crdb/src/dense_id.rs Normal file
View File

@@ -0,0 +1,127 @@
use crate::btree;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::iter;
lazy_static! {
static ref MIN: DenseId = DenseId::min();
static ref MAX: DenseId = DenseId::max();
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct DenseId(SmallVec<[u64; 4]>);
impl DenseId {
pub fn min() -> Self {
Self(smallvec![u64::MIN])
}
pub fn max() -> Self {
Self(smallvec![u64::MAX])
}
pub fn min_ref() -> &'static Self {
&*MIN
}
pub fn max_ref() -> &'static Self {
&*MAX
}
pub fn assign(&mut self, other: &Self) {
self.0.resize(other.0.len(), 0);
self.0.copy_from_slice(&other.0);
}
pub fn between(lhs: &Self, rhs: &Self) -> Self {
let lhs = lhs.0.iter().copied().chain(iter::repeat(u64::MIN));
let rhs = rhs.0.iter().copied().chain(iter::repeat(u64::MAX));
let mut location = SmallVec::new();
for (lhs, rhs) in lhs.zip(rhs) {
let mid = lhs + ((rhs.saturating_sub(lhs)) >> 48);
location.push(mid);
if mid > lhs {
break;
}
}
Self(location)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for DenseId {
fn default() -> Self {
Self::min()
}
}
impl btree::Item for DenseId {
type Summary = DenseId;
fn summary(&self) -> Self::Summary {
self.clone()
}
}
impl btree::KeyedItem for DenseId {
type Key = DenseId;
fn key(&self) -> Self::Key {
self.clone()
}
}
impl btree::Summary for DenseId {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
self.assign(summary);
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::prelude::*;
use std::mem;
#[gpui::test(iterations = 100)]
fn test_dense_id(mut rng: StdRng) {
let mut lhs = Default::default();
let mut rhs = Default::default();
while lhs == rhs {
lhs = DenseId(
(0..rng.gen_range(1..=5))
.map(|_| rng.gen_range(0..=100))
.collect(),
);
rhs = DenseId(
(0..rng.gen_range(1..=5))
.map(|_| rng.gen_range(0..=100))
.collect(),
);
}
if lhs > rhs {
mem::swap(&mut lhs, &mut rhs);
}
let middle = DenseId::between(&lhs, &rhs);
assert!(middle > lhs);
assert!(middle < rhs);
for ix in 0..middle.0.len() - 1 {
assert!(
middle.0[ix] == *lhs.0.get(ix).unwrap_or(&0)
|| middle.0[ix] == *rhs.0.get(ix).unwrap_or(&0)
);
}
}
}

699
crates/crdb/src/history.rs Normal file
View File

@@ -0,0 +1,699 @@
use std::{cmp::Ordering, iter, ops::RangeBounds};
use crate::{
btree::{self, Bias, KvStore, SavedId},
messages::Operation,
OperationCount, OperationId, ReplicaId, RevisionId,
};
use anyhow::{anyhow, Result};
use collections::{BTreeSet, Bound, HashMap, HashSet, VecDeque};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
#[derive(Serialize, Deserialize)]
pub struct SavedHistory {
operations: SavedId,
next_operation_id: OperationId,
max_operation_ids: SavedId,
deferred_operations: SavedId,
}
#[derive(Clone, Debug)]
pub struct History {
operations: btree::Map<OperationId, Operation>,
next_operation_id: OperationId,
max_operation_ids: btree::Map<ReplicaId, OperationCount>,
deferred_operations: btree::Sequence<DeferredOperation>,
}
impl History {
pub fn new(replica_id: ReplicaId) -> Self {
Self {
operations: Default::default(),
next_operation_id: OperationId::new(replica_id),
max_operation_ids: Default::default(),
deferred_operations: Default::default(),
}
}
pub fn ptr_eq(&self, other: &Self) -> bool {
btree::Map::ptr_eq(&self.operations, &other.operations)
&& btree::Map::ptr_eq(&self.max_operation_ids, &other.max_operation_ids)
&& btree::Sequence::ptr_eq(&self.deferred_operations, &other.deferred_operations)
&& self.next_operation_id == other.next_operation_id
}
pub async fn load(saved_history: SavedHistory, kv: &dyn KvStore) -> Result<Self> {
Ok(Self {
operations: btree::Map::load_root(saved_history.operations, kv).await?,
next_operation_id: saved_history.next_operation_id,
max_operation_ids: btree::Map::load_all(saved_history.max_operation_ids, kv).await?,
deferred_operations: btree::Sequence::load_root(saved_history.deferred_operations, kv)
.await?,
})
}
pub async fn save(&self, kv: &dyn KvStore) -> Result<SavedHistory> {
Ok(SavedHistory {
operations: self.operations.save(kv).await?,
next_operation_id: self.next_operation_id,
max_operation_ids: self.max_operation_ids.save(kv).await?,
deferred_operations: self.deferred_operations.save(kv).await?,
})
}
pub fn replica_id(&self) -> ReplicaId {
self.next_operation_id.replica_id
}
pub fn next_operation_id(&mut self) -> OperationId {
self.next_operation_id.tick()
}
pub fn max_operation_ids(&self) -> &btree::Map<ReplicaId, OperationCount> {
&self.max_operation_ids
}
pub async fn insert(
&mut self,
operation: Operation,
kv: &dyn KvStore,
) -> Result<SmallVec<[Operation; 1]>> {
let op_id = operation.id();
self.next_operation_id.observe(op_id);
if self
.max_operation_ids
.load(&op_id.replica_id, kv)
.await?
.copied()
< Some(op_id.operation_count)
{
self.max_operation_ids
.insert(op_id.replica_id, op_id.operation_count);
}
self.operations.store(op_id, operation, kv).await?;
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&op_id)
})
.await?;
let mut cursor = self.deferred_operations.cursor::<OperationId>();
let mut remaining = cursor.slice(&op_id, Bias::Left, &());
let mut flushed = SmallVec::new();
flushed.extend(
cursor
.slice(&op_id, Bias::Right, &())
.iter()
.map(|deferred| deferred.operation.clone()),
);
remaining.append(cursor.suffix(&()), &());
drop(cursor);
self.deferred_operations = remaining;
Ok(flushed)
}
pub fn insert_local(&mut self, operation: Operation) {
let id = operation.id();
self.next_operation_id.observe(operation.id());
self.max_operation_ids
.insert(id.replica_id, id.operation_count);
self.operations.insert(id, operation);
}
pub async fn defer(&mut self, operation: Operation, kv: &dyn KvStore) -> Result<()> {
for parent in operation.parent().iter() {
self.deferred_operations
.load(kv, &(), |probe| {
let key_range = (
Bound::Excluded(*probe.start),
Bound::Included(*probe.summary),
);
key_range.contains(&operation.id())
})
.await?;
self.deferred_operations.insert_or_replace(
DeferredOperation {
parent: *parent,
operation: operation.clone(),
},
&(),
);
}
Ok(())
}
pub async fn can_apply(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result<bool> {
for parent in operation.parent().iter() {
if self.operations.load(parent, kv).await?.is_none() {
return Ok(false);
}
}
Ok(true)
}
pub async fn has_applied(&mut self, operation: &Operation, kv: &dyn KvStore) -> Result<bool> {
Ok(self.operations.load(&operation.id(), kv).await?.is_some())
}
pub async fn operation(
&mut self,
id: OperationId,
kv: &dyn KvStore,
) -> Result<Option<&Operation>> {
self.operations.load(&id, kv).await
}
pub async fn operations_since(
&mut self,
version: &btree::Map<ReplicaId, OperationCount>,
kv: &dyn KvStore,
) -> Result<Vec<Operation>> {
let mut new_operations = Vec::new();
for (replica_id, end_op_count) in self.max_operation_ids.iter() {
let start_op = OperationId {
replica_id: *replica_id,
operation_count: version
.get(&replica_id)
.map(|count| OperationCount(count.0 + 1))
.unwrap_or_default(),
};
let end_op = OperationId {
replica_id: *replica_id,
operation_count: *end_op_count,
};
new_operations.extend(
self.operations
.load_from(&start_op, kv)
.await?
.take_while(|(op_id, _)| **op_id <= end_op)
.map(|(_, op)| op.clone()),
);
}
Ok(new_operations)
}
pub async fn rewind(&mut self, revision_id: &RevisionId, kv: &dyn KvStore) -> Result<Rewind> {
let mut frontier = VecDeque::new();
let mut traversed = HashMap::default();
for operation_id in revision_id.iter() {
let parent_revision = self
.operation(*operation_id, kv)
.await?
.ok_or_else(|| anyhow!("operation {:?} not found", operation_id))?
.parent()
.clone();
traversed
.entry(parent_revision.clone())
.or_insert(BTreeSet::default())
.insert((revision_id.clone(), *operation_id));
frontier.push_back(Frontier {
source: *operation_id,
revision: parent_revision,
});
}
Ok(Rewind {
history: self,
frontier,
traversed,
ancestors: Default::default(),
reachable_len: revision_id.len(),
start: revision_id.clone(),
})
}
}
struct Frontier {
source: OperationId,
revision: RevisionId,
}
pub struct Rewind<'a> {
history: &'a mut History,
frontier: VecDeque<Frontier>,
traversed: HashMap<RevisionId, BTreeSet<(RevisionId, OperationId)>>,
ancestors: HashMap<RevisionId, HashSet<OperationId>>,
reachable_len: usize,
start: RevisionId,
}
impl Rewind<'_> {
pub async fn next(&mut self, kv: &dyn KvStore) -> Result<Option<RevisionId>> {
while let Some(frontier) = self.frontier.pop_front() {
let reachable_from = self.ancestors.entry(frontier.revision.clone()).or_default();
reachable_from.insert(frontier.source);
if reachable_from.len() == self.reachable_len {
self.reachable_len = frontier.revision.len();
self.frontier.clear();
self.ancestors.clear();
self.start = frontier.revision.clone();
for operation_id in frontier.revision.iter() {
let parent_revision = self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone();
self.traversed
.entry(parent_revision.clone())
.or_default()
.insert((frontier.revision.clone(), *operation_id));
self.frontier.push_back(Frontier {
source: *operation_id,
revision: parent_revision,
});
}
return Ok(Some(frontier.revision));
} else {
for operation_id in frontier.revision.iter() {
let parent_revision = self
.history
.operation(*operation_id, kv)
.await?
.expect("operation must exist")
.parent()
.clone();
self.traversed
.entry(parent_revision.clone())
.or_default()
.insert((frontier.revision.clone(), *operation_id));
self.frontier.push_back(Frontier {
source: frontier.source,
revision: parent_revision,
});
}
}
}
Ok(None)
}
pub fn replay(mut self) -> impl Iterator<Item = ReplayOperation> {
let mut stack = VecDeque::new();
if let Some(children) = self.traversed.remove(&self.start) {
for (child_revision_id, operation_id) in children {
stack.push_back(ReplayOperation {
parent_revision_id: self.start.clone(),
target_revision_id: child_revision_id.clone(),
operation_id,
});
}
}
iter::from_fn(move || {
let entry = stack.pop_front()?;
if let Some(children) = self.traversed.remove(&entry.target_revision_id) {
for (child_revision, operation_id) in children {
stack.push_back(ReplayOperation {
parent_revision_id: entry.target_revision_id.clone(),
target_revision_id: child_revision.clone(),
operation_id,
});
}
}
Some(entry)
})
}
}
#[derive(Clone, Eq, PartialEq)]
pub struct ReplayOperation {
pub parent_revision_id: RevisionId,
pub target_revision_id: RevisionId,
pub operation_id: OperationId,
}
impl std::fmt::Debug for ReplayOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:?} -> {:?} via {:?}",
self.parent_revision_id, self.target_revision_id, self.operation_id
)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeferredOperation {
parent: OperationId,
operation: Operation,
}
impl PartialEq for DeferredOperation {
fn eq(&self, other: &Self) -> bool {
self.parent == other.parent && self.operation.id() == other.operation.id()
}
}
impl Eq for DeferredOperation {}
impl PartialOrd for DeferredOperation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DeferredOperation {
fn cmp(&self, other: &Self) -> Ordering {
self.parent
.cmp(&other.parent)
.then_with(|| self.operation.id().cmp(&other.operation.id()))
}
}
impl btree::Item for DeferredOperation {
type Summary = OperationId;
fn summary(&self) -> Self::Summary {
self.parent
}
}
impl btree::KeyedItem for DeferredOperation {
type Key = (OperationId, OperationId);
fn key(&self) -> Self::Key {
(self.parent, self.operation.id())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::btree::tests::InMemoryKv;
#[gpui::test]
async fn test_rewind() {
let kv = InMemoryKv::default();
let mut history = History::new(ReplicaId(0));
let op1 = insert_operation(&[], &mut history, &kv).await;
let op2 = insert_operation(&[op1.id()], &mut history, &kv).await;
let op3 = insert_operation(&[op1.id()], &mut history, &kv).await;
let op4 = insert_operation(&[op2.id(), op3.id()], &mut history, &kv).await;
let op5 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op6 = insert_operation(&[op4.id()], &mut history, &kv).await;
let op7 = insert_operation(&[op2.id()], &mut history, &kv).await;
let op8 = insert_operation(&[op5.id()], &mut history, &kv).await;
let op9 = insert_operation(&[op5.id()], &mut history, &kv).await;
let op10 = insert_operation(&[op8.id()], &mut history, &kv).await;
let op11 = insert_operation(&[op9.id(), op10.id()], &mut history, &kv).await;
assert_eq!(
rewind(&[op4.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op6.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op4.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op6.id()].as_slice()),
operation_id: op6.id(),
}]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op5.id(), op6.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op4.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()),
operation_id: op5.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id(), op6.id()].as_slice()),
operation_id: op6.id(),
}
]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op4.id(), op7.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op2.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()),
operation_id: op7.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id(), op7.id()].as_slice()),
operation_id: op4.id(),
},
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
assert_eq!(
rewind(&[op11.id()], &mut history, &kv).await,
&[
(
RevisionId::from([op9.id(), op10.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
target_revision_id: RevisionId::from([op11.id()].as_slice()),
operation_id: op11.id(),
}]
),
(
RevisionId::from([op5.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op5.id()].as_slice()),
target_revision_id: RevisionId::from([op8.id()].as_slice()),
operation_id: op8.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op5.id()].as_slice()),
target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
operation_id: op9.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op8.id()].as_slice()),
target_revision_id: RevisionId::from([op9.id(), op10.id()].as_slice()),
operation_id: op10.id(),
}
]
),
(
RevisionId::from([op4.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op4.id()].as_slice()),
target_revision_id: RevisionId::from([op5.id()].as_slice()),
operation_id: op5.id(),
}]
),
(
RevisionId::from([op2.id(), op3.id()].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
target_revision_id: RevisionId::from([op4.id()].as_slice()),
operation_id: op4.id(),
}]
),
(
RevisionId::from([op1.id()].as_slice()),
vec![
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op2.id(),
},
ReplayOperation {
parent_revision_id: RevisionId::from([op1.id()].as_slice()),
target_revision_id: RevisionId::from([op2.id(), op3.id()].as_slice()),
operation_id: op3.id(),
}
]
),
(
RevisionId::from([].as_slice()),
vec![ReplayOperation {
parent_revision_id: RevisionId::from([].as_slice()),
target_revision_id: RevisionId::from([op1.id()].as_slice()),
operation_id: op1.id(),
}]
),
]
);
}
async fn insert_operation(
parent: &[OperationId],
history: &mut History,
kv: &dyn KvStore,
) -> Operation {
let operation = Operation::CreateBranch(crate::operations::CreateBranch {
id: history.next_operation_id(),
parent: parent.into(),
name: "1".into(),
});
history.insert(operation.clone(), kv).await.unwrap();
operation
}
async fn rewind(
revision_id: &[OperationId],
history: &mut History,
kv: &dyn KvStore,
) -> Vec<(RevisionId, Vec<ReplayOperation>)> {
let mut rewind = history.rewind(&revision_id.into(), kv).await.unwrap();
let mut results = Vec::new();
let mut prev_replay = Vec::new();
let mut ix = 0;
while let Some(ancestor_id) = rewind.next(kv).await.unwrap() {
let mut replay = rewind.replay().collect::<Vec<_>>();
let suffix_start = replay.len() - prev_replay.len();
assert_eq!(prev_replay, &replay[suffix_start..]);
prev_replay = replay.clone();
drop(replay.drain(suffix_start..));
results.push((ancestor_id, replay));
rewind = history.rewind(&revision_id.into(), kv).await.unwrap();
ix += 1;
for _ in 0..ix {
rewind.next(kv).await.unwrap();
}
}
results
}
}

182
crates/crdb/src/messages.rs Normal file
View File

@@ -0,0 +1,182 @@
use crate::{
operations::{CreateBranch, CreateDocument, Edit},
BranchId, OperationCount, OperationId, ReplicaId, RepoId, Request, RevisionId, RoomCredentials,
};
use collections::BTreeMap;
use serde::{Deserialize, Serialize};
use std::{any::Any, sync::Arc};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RequestEnvelope {
PublishRepo(PublishRepo),
CloneRepo(CloneRepo),
ReconnectToRepo(ReconnectToRepo),
SyncRepo(SyncRepo),
PublishOperations(PublishOperations),
}
impl RequestEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
match self {
RequestEnvelope::PublishRepo(request) => Box::new(request),
RequestEnvelope::CloneRepo(request) => Box::new(request),
RequestEnvelope::ReconnectToRepo(request) => Box::new(request),
RequestEnvelope::SyncRepo(request) => Box::new(request),
RequestEnvelope::PublishOperations(request) => Box::new(request),
}
}
}
impl From<Operation> for MessageEnvelope {
fn from(value: Operation) -> Self {
Self::Operation(value)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishRepo {
pub id: RepoId,
pub name: Arc<str>,
}
impl Request for PublishRepo {
type Response = PublishRepoResponse;
}
impl Into<RequestEnvelope> for PublishRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::PublishRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct PublishRepoResponse {
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CloneRepo {
pub name: Arc<str>,
}
impl Request for CloneRepo {
type Response = CloneRepoResponse;
}
impl Into<RequestEnvelope> for CloneRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::CloneRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct CloneRepoResponse {
pub repo_id: RepoId,
pub replica_id: ReplicaId,
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReconnectToRepo {
pub id: RepoId,
pub replica_id: ReplicaId,
}
impl Request for ReconnectToRepo {
type Response = ReconnectToRepoResponse;
}
impl Into<RequestEnvelope> for ReconnectToRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::ReconnectToRepo(self)
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct ReconnectToRepoResponse {
pub credentials: RoomCredentials,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncRepo {
pub id: RepoId,
pub max_operation_ids: BTreeMap<ReplicaId, OperationCount>,
}
impl Request for SyncRepo {
type Response = SyncRepoResponse;
}
impl Into<RequestEnvelope> for SyncRepo {
fn into(self) -> RequestEnvelope {
RequestEnvelope::SyncRepo(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncRepoResponse {
pub operations: Vec<Operation>,
pub max_operation_ids: BTreeMap<ReplicaId, OperationCount>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishOperations {
pub repo_id: RepoId,
pub operations: Vec<Operation>,
}
impl Request for PublishOperations {
type Response = ();
}
impl Into<RequestEnvelope> for PublishOperations {
fn into(self) -> RequestEnvelope {
RequestEnvelope::PublishOperations(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MessageEnvelope {
Operation(Operation),
}
impl MessageEnvelope {
pub fn unwrap(self) -> Box<dyn Any> {
Box::new(match self {
MessageEnvelope::Operation(message) => message,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Operation {
CreateDocument(CreateDocument),
Edit(Edit),
CreateBranch(CreateBranch),
}
impl Operation {
pub fn id(&self) -> OperationId {
match self {
Operation::CreateDocument(op) => op.id,
Operation::Edit(op) => op.id,
Operation::CreateBranch(op) => op.id,
}
}
pub fn branch_id(&self) -> BranchId {
match self {
Operation::CreateBranch(op) => op.id,
Operation::CreateDocument(op) => op.branch_id,
Operation::Edit(op) => op.branch_id,
}
}
pub fn parent(&self) -> &RevisionId {
match self {
Operation::CreateDocument(op) => &op.parent,
Operation::Edit(op) => &op.parent,
Operation::CreateBranch(op) => &op.parent,
}
}
}

View File

@@ -0,0 +1,286 @@
use crate::{
btree::{self, Bias},
dense_id::DenseId,
AnchorRange, BranchId, DocumentFragment, DocumentFragmentSummary, DocumentId, DocumentMetadata,
InsertionFragment, OperationId, Revision, RevisionId, RopeBuilder, Tombstone,
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::{cmp, sync::Arc};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateBranch {
pub id: BranchId,
pub parent: RevisionId,
pub name: Arc<str>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateDocument {
pub id: DocumentId,
pub branch_id: BranchId,
pub parent: RevisionId,
}
impl CreateDocument {
pub fn apply(self, revision: &mut Revision) {
let mut cursor = revision.document_fragments.cursor::<DocumentId>();
let mut new_document_fragments = cursor.slice(&self.id, Bias::Right, &());
new_document_fragments.push(
DocumentFragment {
document_id: self.id,
location: DenseId::min(),
insertion_id: self.id,
insertion_subrange: 0..0,
tombstones: Default::default(),
undo_count: 0,
},
&(),
);
new_document_fragments.append(cursor.suffix(&()), &());
drop(cursor);
revision.document_fragments = new_document_fragments;
revision.insertion_fragments.insert_or_replace(
InsertionFragment {
insertion_id: self.id,
offset_in_insertion: 0,
fragment_location: DenseId::min(),
},
&(),
);
revision.document_metadata.insert(
self.id,
DocumentMetadata {
path: None,
last_change: self.id,
},
);
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Edit {
pub id: OperationId,
pub document_id: DocumentId,
pub branch_id: BranchId,
pub parent: RevisionId,
pub edits: SmallVec<[(AnchorRange, Arc<str>); 2]>,
}
impl Edit {
pub fn apply(self, parent_revision: &Revision, revision: &mut Revision) -> Result<()> {
if self.edits.is_empty() {
return Ok(());
}
let mut old_fragments = revision
.document_fragments
.cursor::<DocumentFragmentSummary>();
// Slice to the start of the document this to which this operation applies.
let mut new_fragments = old_fragments.slice(&self.document_id, Bias::Left, &());
let mut new_insertions = Vec::new();
let mut new_ropes = RopeBuilder::new(
revision.visible_text.cursor(0),
revision.hidden_text.cursor(0),
);
new_ropes.append(
new_fragments.summary().visible_len,
new_fragments.summary().hidden_len,
);
let mut insertion_offset = 0;
let mut current_fragment = old_fragments.item().cloned();
for (range, new_text) in self.edits {
// We need to tombstone the intersection of the edit's range with fragments that
// were visible in the operation's parent revision.
for mut parent_fragment in parent_revision
.visible_fragments_for_range(range.clone())?
.cloned()
{
// Intersect the parent fragment with the edit's range.
if parent_fragment.insertion_id == range.start_insertion_id {
parent_fragment.insertion_subrange.start = range.start_offset_in_insertion;
}
if parent_fragment.insertion_id == range.end_insertion_id {
parent_fragment.insertion_subrange.end = cmp::min(
parent_fragment.insertion_subrange.end,
range.end_offset_in_insertion,
);
}
// Find the locations of the parent fragment in the new revision.
for fragment_location in revision.fragment_locations(
parent_fragment.insertion_id,
parent_fragment.insertion_subrange,
) {
if let Some(fragment) = current_fragment.as_ref() {
// Advance to fragment_location if it is greater than the location of the current fragment,
if *fragment_location > fragment.location {
// Flush the remainder of current fragment.
if !fragment.insertion_subrange.is_empty() || fragment.is_sentinel() {
new_ropes.push_fragment(fragment, fragment.visible());
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_fragments.push(fragment.clone(), &());
}
old_fragments.next(&());
// Append all fragments between the previous fragment and the new fragment_location.
let slice = old_fragments.slice(
&(self.document_id, fragment_location),
Bias::Left,
&(),
);
new_ropes
.append(slice.summary().visible_len, slice.summary().hidden_len);
new_fragments.append(slice, &());
current_fragment = old_fragments.item().cloned();
// We should always find a fragment when seeking to fragment_location.
debug_assert!(current_fragment.is_some());
}
}
// If the edit starts at the end of the current fragment, flush it.
if let Some(fragment) = current_fragment.as_ref() {
if fragment.insertion_id == range.start_insertion_id
&& fragment.insertion_subrange.end == range.start_offset_in_insertion
{
let fragment = current_fragment.take().unwrap();
new_ropes.push_fragment(&fragment, fragment.visible());
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_fragments.push(fragment, &());
old_fragments.next(&());
current_fragment = old_fragments.item().and_then(|fragment| {
if fragment.document_id == self.document_id {
Some(fragment.clone())
} else {
None
}
});
}
}
if let Some(fragment) = current_fragment.take() {
// If we haven't advanced off the end, then the current fragment intersects
// the current edit's range.
let (prefix, mut intersection, suffix) = fragment.intersect(range.clone());
// If we have a prefix, push it.
if let Some(mut prefix) = prefix {
prefix.location = DenseId::between(
&new_fragments.summary().max_location,
&intersection.location,
);
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&prefix)));
new_ropes.push_fragment(&prefix, prefix.visible());
new_fragments.push(prefix, &());
}
if let Some(suffix) = suffix {
intersection.location = DenseId::between(
&new_fragments.summary().max_location,
&suffix.location,
);
// If we still have a suffix, the next edit may be inside of it, so set it as
// the current fragment and continue the loop.
current_fragment = Some(suffix);
} else {
// Otherwise, advance to the next fragment if it's still part of the same document.
old_fragments.next(&());
if let Some(next_fragment) = old_fragments.item() {
if next_fragment.document_id == self.document_id {
current_fragment = Some(next_fragment.clone());
}
}
}
// Then tombstone the intersecting portion.
let was_visible = intersection.visible();
intersection.tombstones.push(Tombstone {
id: self.id,
undo_count: 0,
});
new_ropes.push_fragment(&intersection, was_visible);
new_insertions
.push(btree::Edit::Insert(InsertionFragment::new(&intersection)));
new_fragments.push(intersection, &());
}
}
}
// Move past insertions that were causally after the current operation.
while let Some(fragment) = current_fragment.as_ref() {
if fragment.insertion_id.is_causally_after(self.id) {
new_ropes.push_fragment(fragment, fragment.visible());
new_insertions.push(btree::Edit::Insert(InsertionFragment::new(fragment)));
new_fragments.push(fragment.clone(), &());
old_fragments.next(&());
current_fragment = old_fragments.item().and_then(|fragment| {
if fragment.document_id == self.document_id {
Some(fragment.clone())
} else {
None
}
});
} else {
break;
}
}
// Finally, insert a fragment containing the new text.
if !new_text.is_empty() {
let fragment = DocumentFragment {
document_id: self.document_id,
location: DenseId::between(
&new_fragments.summary().max_location,
current_fragment
.as_ref()
.map_or(DenseId::max_ref(), |fragment| &fragment.location),
),
insertion_id: self.id,
insertion_subrange: insertion_offset..insertion_offset + new_text.len(),
tombstones: Default::default(),
undo_count: 0,
};
new_insertions.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_ropes.push_str(new_text.as_ref());
new_fragments.push(fragment, &());
insertion_offset += new_text.len();
}
}
if let Some(fragment) = current_fragment {
if !fragment.insertion_subrange.is_empty() {
new_ropes.push_fragment(&fragment, fragment.visible());
new_insertions.push(btree::Edit::Insert(InsertionFragment::new(&fragment)));
new_fragments.push(fragment, &());
}
old_fragments.next(&());
}
let suffix = old_fragments.suffix(&());
drop(old_fragments);
new_ropes.append(suffix.summary().visible_len, suffix.summary().hidden_len);
let (visible_text, hidden_text) = new_ropes.finish();
revision.visible_text = visible_text;
revision.hidden_text = hidden_text;
new_fragments.append(suffix, &());
revision.document_fragments = new_fragments;
new_insertions.sort_unstable_by_key(|edit| edit.key());
new_insertions.dedup_by_key(|edit| edit.key());
revision.insertion_fragments.edit(new_insertions, &());
revision.check_invariants();
Ok(())
}
}

1443
crates/crdb/src/rope.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
use std::ops::{Add, AddAssign, Sub};
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct OffsetUtf16(pub usize);
impl<'a> Add<&'a Self> for OffsetUtf16 {
type Output = Self;
fn add(self, other: &'a Self) -> Self::Output {
Self(self.0 + other.0)
}
}
impl Add for OffsetUtf16 {
type Output = Self;
fn add(self, other: Self) -> Self::Output {
Self(self.0 + other.0)
}
}
impl<'a> Sub<&'a Self> for OffsetUtf16 {
type Output = Self;
fn sub(self, other: &'a Self) -> Self::Output {
debug_assert!(*other <= self);
Self(self.0 - other.0)
}
}
impl Sub for OffsetUtf16 {
type Output = OffsetUtf16;
fn sub(self, other: Self) -> Self::Output {
debug_assert!(other <= self);
Self(self.0 - other.0)
}
}
impl<'a> AddAssign<&'a Self> for OffsetUtf16 {
fn add_assign(&mut self, other: &'a Self) {
self.0 += other.0;
}
}
impl AddAssign<Self> for OffsetUtf16 {
fn add_assign(&mut self, other: Self) {
self.0 += other.0;
}
}

View File

@@ -0,0 +1,129 @@
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
ops::{Add, AddAssign, Sub},
};
#[derive(Clone, Copy, Default, Eq, PartialEq, Debug, Hash, Serialize, Deserialize)]
pub struct Point {
pub row: u32,
pub column: u32,
}
impl Point {
pub const MAX: Self = Self {
row: u32::MAX,
column: u32::MAX,
};
pub fn new(row: u32, column: u32) -> Self {
Point { row, column }
}
pub fn zero() -> Self {
Point::new(0, 0)
}
pub fn parse_str(s: &str) -> Self {
let mut point = Self::zero();
for (row, line) in s.split('\n').enumerate() {
point.row = row as u32;
point.column = line.len() as u32;
}
point
}
pub fn is_zero(&self) -> bool {
self.row == 0 && self.column == 0
}
pub fn saturating_sub(self, other: Self) -> Self {
if self < other {
Self::zero()
} else {
self - other
}
}
}
impl<'a> Add<&'a Self> for Point {
type Output = Point;
fn add(self, other: &'a Self) -> Self::Output {
self + *other
}
}
impl Add for Point {
type Output = Point;
fn add(self, other: Self) -> Self::Output {
if other.row == 0 {
Point::new(self.row, self.column + other.column)
} else {
Point::new(self.row + other.row, other.column)
}
}
}
impl<'a> Sub<&'a Self> for Point {
type Output = Point;
fn sub(self, other: &'a Self) -> Self::Output {
self - *other
}
}
impl Sub for Point {
type Output = Point;
fn sub(self, other: Self) -> Self::Output {
debug_assert!(other <= self);
if self.row == other.row {
Point::new(0, self.column - other.column)
} else {
Point::new(self.row - other.row, self.column)
}
}
}
impl<'a> AddAssign<&'a Self> for Point {
fn add_assign(&mut self, other: &'a Self) {
*self += *other;
}
}
impl AddAssign<Self> for Point {
fn add_assign(&mut self, other: Self) {
if other.row == 0 {
self.column += other.column;
} else {
self.row += other.row;
self.column = other.column;
}
}
}
impl PartialOrd for Point {
fn partial_cmp(&self, other: &Point) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Point {
#[cfg(target_pointer_width = "64")]
fn cmp(&self, other: &Point) -> Ordering {
let a = (self.row as usize) << 32 | self.column as usize;
let b = (other.row as usize) << 32 | other.column as usize;
a.cmp(&b)
}
#[cfg(target_pointer_width = "32")]
fn cmp(&self, other: &Point) -> Ordering {
match self.row.cmp(&other.row) {
Ordering::Equal => self.column.cmp(&other.column),
comparison @ _ => comparison,
}
}
}

View File

@@ -0,0 +1,119 @@
use std::{
cmp::Ordering,
ops::{Add, AddAssign, Sub},
};
#[derive(Clone, Copy, Default, Eq, PartialEq, Debug, Hash)]
pub struct PointUtf16 {
pub row: u32,
pub column: u32,
}
impl PointUtf16 {
pub const MAX: Self = Self {
row: u32::MAX,
column: u32::MAX,
};
pub fn new(row: u32, column: u32) -> Self {
PointUtf16 { row, column }
}
pub fn zero() -> Self {
PointUtf16::new(0, 0)
}
pub fn is_zero(&self) -> bool {
self.row == 0 && self.column == 0
}
pub fn saturating_sub(self, other: Self) -> Self {
if self < other {
Self::zero()
} else {
self - other
}
}
}
impl<'a> Add<&'a Self> for PointUtf16 {
type Output = PointUtf16;
fn add(self, other: &'a Self) -> Self::Output {
self + *other
}
}
impl Add for PointUtf16 {
type Output = PointUtf16;
fn add(self, other: Self) -> Self::Output {
if other.row == 0 {
PointUtf16::new(self.row, self.column + other.column)
} else {
PointUtf16::new(self.row + other.row, other.column)
}
}
}
impl<'a> Sub<&'a Self> for PointUtf16 {
type Output = PointUtf16;
fn sub(self, other: &'a Self) -> Self::Output {
self - *other
}
}
impl Sub for PointUtf16 {
type Output = PointUtf16;
fn sub(self, other: Self) -> Self::Output {
debug_assert!(other <= self);
if self.row == other.row {
PointUtf16::new(0, self.column - other.column)
} else {
PointUtf16::new(self.row - other.row, self.column)
}
}
}
impl<'a> AddAssign<&'a Self> for PointUtf16 {
fn add_assign(&mut self, other: &'a Self) {
*self += *other;
}
}
impl AddAssign<Self> for PointUtf16 {
fn add_assign(&mut self, other: Self) {
if other.row == 0 {
self.column += other.column;
} else {
self.row += other.row;
self.column = other.column;
}
}
}
impl PartialOrd for PointUtf16 {
fn partial_cmp(&self, other: &PointUtf16) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PointUtf16 {
#[cfg(target_pointer_width = "64")]
fn cmp(&self, other: &PointUtf16) -> Ordering {
let a = (self.row as usize) << 32 | self.column as usize;
let b = (other.row as usize) << 32 | other.column as usize;
a.cmp(&b)
}
#[cfg(target_pointer_width = "32")]
fn cmp(&self, other: &PointUtf16) -> Ordering {
match self.row.cmp(&other.row) {
Ordering::Equal => self.column.cmp(&other.column),
comparison @ _ => comparison,
}
}
}

View File

@@ -0,0 +1,58 @@
use super::{ChunkSummary, TextDimension, TextSummary};
use crate::btree;
use std::ops::{Add, AddAssign, Sub, SubAssign};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Unclipped<T>(pub T);
impl<T> From<T> for Unclipped<T> {
fn from(value: T) -> Self {
Unclipped(value)
}
}
impl<'a, T: btree::Dimension<'a, ChunkSummary>> btree::Dimension<'a, ChunkSummary>
for Unclipped<T>
{
fn add_summary(&mut self, summary: &'a ChunkSummary, _: &()) {
self.0.add_summary(summary, &());
}
}
impl<T: TextDimension> TextDimension for Unclipped<T> {
fn from_text_summary(summary: &TextSummary) -> Self {
Unclipped(T::from_text_summary(summary))
}
fn add_assign(&mut self, other: &Self) {
TextDimension::add_assign(&mut self.0, &other.0);
}
}
impl<T: Add<T, Output = T>> Add<Unclipped<T>> for Unclipped<T> {
type Output = Unclipped<T>;
fn add(self, rhs: Unclipped<T>) -> Self::Output {
Unclipped(self.0 + rhs.0)
}
}
impl<T: Sub<T, Output = T>> Sub<Unclipped<T>> for Unclipped<T> {
type Output = Unclipped<T>;
fn sub(self, rhs: Unclipped<T>) -> Self::Output {
Unclipped(self.0 - rhs.0)
}
}
impl<T: AddAssign<T>> AddAssign<Unclipped<T>> for Unclipped<T> {
fn add_assign(&mut self, rhs: Unclipped<T>) {
self.0 += rhs.0;
}
}
impl<T: SubAssign<T>> SubAssign<Unclipped<T>> for Unclipped<T> {
fn sub_assign(&mut self, rhs: Unclipped<T>) {
self.0 -= rhs.0;
}
}

419
crates/crdb/src/sync.rs Normal file
View File

@@ -0,0 +1,419 @@
use crate::{
btree::{self, Bias},
messages::{Operation, PublishOperations},
OperationId,
};
use bromberg_sl2::HashMatrix;
use std::{
cmp::Ordering,
iter,
ops::{Range, RangeBounds},
};
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Digest {
count: usize,
hash: HashMatrix,
}
impl btree::Item for Operation {
type Summary = OperationSummary;
fn summary(&self) -> Self::Summary {
OperationSummary {
max_id: self.id(),
digest: Digest {
count: 1,
hash: bromberg_sl2::hash_strict(&self.id().to_be_bytes()),
},
}
}
}
impl btree::KeyedItem for Operation {
type Key = OperationId;
fn key(&self) -> Self::Key {
self.id()
}
}
#[derive(Clone, Debug, Default)]
pub struct OperationSummary {
max_id: OperationId,
digest: Digest,
}
impl btree::Summary for OperationSummary {
type Context = ();
fn add_summary(&mut self, summary: &Self, _: &()) {
debug_assert!(self.max_id < summary.max_id);
self.max_id = summary.max_id;
self.digest.count += summary.digest.count;
self.digest.hash = self.digest.hash * summary.digest.hash;
}
}
impl btree::Dimension<'_, OperationSummary> for OperationId {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
debug_assert!(*self < summary.max_id);
*self = summary.max_id;
}
}
impl btree::Dimension<'_, OperationSummary> for usize {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
*self += summary.digest.count;
}
}
impl btree::Dimension<'_, OperationSummary> for Digest {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
self.count += summary.digest.count;
self.hash = self.hash * summary.digest.hash;
}
}
struct SyncRequest {
digests: Vec<Digest>,
}
struct SyncResponse {
shared_prefix_end: usize,
operations: Vec<Operation>,
}
struct SyncStats {
server_operations: usize,
client_operations: usize,
}
fn sync_server(
operations: &mut btree::Sequence<Operation>,
sync_request: SyncRequest,
) -> SyncResponse {
for client_digest in sync_request.digests {
let server_digest = digest_for_range(operations, 0..client_digest.count);
if server_digest == client_digest {
return SyncResponse {
shared_prefix_end: server_digest.count,
operations: operations_for_range(operations, server_digest.count..)
.cloned()
.collect(),
};
}
}
SyncResponse {
shared_prefix_end: 0,
operations: operations.iter().cloned().collect(),
}
}
fn publish_operations(
server_operations: &mut btree::Sequence<Operation>,
request: PublishOperations,
) {
server_operations.edit(
request
.operations
.into_iter()
.map(btree::Edit::Insert)
.collect(),
&(),
);
}
fn sync_client(
client_operations: &mut btree::Sequence<Operation>,
server_operations: &mut btree::Sequence<Operation>,
min_shared_prefix_end: usize,
max_digest_count: usize,
) -> SyncStats {
let mut digests = Vec::new();
let mut digest_end_ix = client_operations.summary().digest.count;
// We will multiply by some some factor less than 1 to produce digests
// over ever smaller digest ranges. The following formula ensures that
// we will produce `max_digest_count` digests, and that the last digest
// will go from `0` to `min_shared_prefix_end`.
// op_count * factor^max_digest_count = min_shared_prefix_end
// factor^max_digest_count = min_shared_prefix_end/op_count
// max_digest_count * log_2(factor) = log_2(min_shared_prefix_end/op_count)
// log_2(factor) = log_2(min_shared_prefix_end/op_count)/max_digest_count
// factor = 2^(log_2(min_shared_prefix_end/op_count)/max_digest_count)
let factor = 2f64.powf(
(min_shared_prefix_end as f64 / digest_end_ix as f64).log2() / max_digest_count as f64,
);
for _ in 0..max_digest_count {
if digest_end_ix <= min_shared_prefix_end {
break;
}
digests.push(digest_for_range(client_operations, 0..digest_end_ix));
digest_end_ix = (digest_end_ix as f64 * factor).ceil() as usize; // 🪬
}
let server_response = sync_server(server_operations, SyncRequest { digests });
let new_ops_from_client = {
let mut new_ops_from_client = Vec::new();
let mut client_cursor = client_operations.cursor::<usize>();
let mut new_client_operations =
client_cursor.slice(&server_response.shared_prefix_end, Bias::Right, &());
let mut server_operations = server_response.operations.iter().peekable();
let mut new_ops_from_server = Vec::new();
while let Some(server_op) = server_operations.peek() {
match client_cursor.item() {
Some(client_operation) => {
let comparison = server_op.id().cmp(&client_operation.id());
match comparison {
Ordering::Less => {
new_ops_from_server.push(server_operations.next().unwrap().clone());
}
_ => {
new_client_operations.extend(new_ops_from_server.drain(..), &());
new_client_operations.push(client_operation.clone(), &());
client_cursor.next(&());
if comparison == Ordering::Equal {
server_operations.next();
} else {
new_ops_from_client.push(client_operation.clone());
}
}
}
}
None => {
new_ops_from_server.push(server_operations.next().unwrap().clone());
}
}
}
new_client_operations.extend(new_ops_from_server, &());
let client_suffix = client_cursor.suffix(&());
new_client_operations.append(client_suffix.clone(), &());
drop(client_cursor);
*client_operations = new_client_operations;
new_ops_from_client.extend(client_suffix.iter().cloned());
new_ops_from_client
};
let sync_stats = SyncStats {
server_operations: server_response.operations.len(),
client_operations: new_ops_from_client.len(),
};
publish_operations(
server_operations,
PublishOperations {
repo_id: Default::default(),
operations: new_ops_from_client,
},
);
sync_stats
}
fn digest_for_range(operations: &btree::Sequence<Operation>, range: Range<usize>) -> Digest {
let mut cursor = operations.cursor::<usize>();
cursor.seek(&range.start, Bias::Right, &());
cursor.summary(&range.end, Bias::Right, &())
}
fn operations_for_range<T: RangeBounds<usize>>(
operations: &btree::Sequence<Operation>,
range: T,
) -> impl Iterator<Item = &Operation> {
let mut cursor = operations.cursor::<usize>();
match range.start_bound() {
collections::Bound::Included(start) => {
cursor.seek(start, Bias::Right, &());
}
collections::Bound::Excluded(start) => {
cursor.seek(&(*start + 1), Bias::Right, &());
}
collections::Bound::Unbounded => cursor.next(&()),
}
iter::from_fn(move || {
if range.contains(cursor.start()) {
let operation = cursor.item()?;
cursor.next(&());
Some(operation)
} else {
None
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{operations, OperationCount};
use rand::prelude::*;
use std::env;
#[test]
fn test_sync() {
assert_sync(1..=10, 5..=10, 0, 16);
assert_sync(1..=10, 4..=10, 0, 16);
assert_sync(1..=10, 1..=5, 0, 16);
assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10], 0, 16);
assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12], 0, 16);
assert_sync(1..=10, 5..=14, 0, 16);
assert_sync(1..=80, (1..=70).chain(90..=100), 0, 16);
assert_sync(1..=1910, (1..=1900).chain(1910..=2000), 0, 16);
}
#[gpui::test(iterations = 100)]
fn test_random(mut rng: StdRng) {
let max_operations = env::var("OPERATIONS")
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
let min_shared_prefix_end = 1024;
let max_digest_count = 1024;
let mut connected = true;
let mut client_ops = btree::Sequence::new();
let mut server_ops = btree::Sequence::new();
let mut ideal_server_ops = 0;
let mut ideal_client_ops = 0;
let mut next_reconnection = None;
for ix in 1..=max_operations {
if connected && rng.gen_bool(0.0005) {
dbg!(ix);
connected = false;
let mut factor = 0.0005;
while rng.gen() {
factor *= 2.0;
}
let remaining_operations = max_operations - ix;
let disconnection_period = (remaining_operations as f64 * factor) as usize;
next_reconnection = Some(ix + disconnection_period);
dbg!(disconnection_period);
}
if next_reconnection == Some(ix) {
connected = true;
next_reconnection = None;
log::debug!("===============");
let stats = sync_client(
&mut client_ops,
&mut server_ops,
min_shared_prefix_end,
max_digest_count,
);
log::debug!(
"ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%",
ideal_server_ops,
stats.server_operations,
stats.server_operations - ideal_server_ops,
((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100.
);
log::debug!(
"ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%",
ideal_client_ops,
stats.client_operations,
stats.client_operations - ideal_client_ops,
((stats.client_operations as f64 / ideal_client_ops as f64) - 1.0) * 100.
);
assert_eq!(
client_ops.iter().map(|op| op.id()).collect::<Vec<_>>(),
server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
);
ideal_client_ops = 0;
ideal_server_ops = 0;
}
if connected {
client_ops.push(build_operation(ix), &());
server_ops.push(build_operation(ix), &());
} else if rng.gen_bool(0.95) {
ideal_server_ops += 1;
server_ops.push(build_operation(ix), &());
} else {
ideal_client_ops += 1;
client_ops.push(build_operation(ix), &());
}
}
log::debug!("============");
let stats = sync_client(
&mut client_ops,
&mut server_ops,
min_shared_prefix_end,
max_digest_count,
);
log::debug!(
"ideal server ops: {}, actual server ops: {}, abs error: {}, pct error: {:.3}%",
ideal_server_ops,
stats.server_operations,
stats.server_operations - ideal_server_ops,
((stats.server_operations as f64 / ideal_server_ops as f64) - 1.) * 100.
);
log::debug!(
"ideal client ops: {}, actual client ops: {}, abs error: {}, pct error: {:.3}%",
ideal_client_ops,
stats.client_operations,
stats.client_operations - ideal_client_ops,
((stats.client_operations as f64 / ideal_client_ops as f64) - 1.0) * 100.
);
assert_eq!(
client_ops.iter().map(|op| op.id()).collect::<Vec<_>>(),
server_ops.iter().map(|op| op.id()).collect::<Vec<_>>()
);
}
fn assert_sync(
client_ops: impl IntoIterator<Item = usize>,
server_ops: impl IntoIterator<Item = usize>,
min_digest_delta: usize,
max_digest_count: usize,
) {
let client_ops = client_ops
.into_iter()
.map(build_operation)
.collect::<Vec<_>>();
let server_ops = server_ops
.into_iter()
.map(build_operation)
.collect::<Vec<_>>();
let mut client_operations = btree::Sequence::from_iter(client_ops, &());
let mut server_operations = btree::Sequence::from_iter(server_ops, &());
sync_client(
&mut client_operations,
&mut server_operations,
min_digest_delta,
max_digest_count,
);
assert_eq!(
client_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>(),
server_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>()
);
}
fn build_operation(id: usize) -> Operation {
Operation::CreateBranch(operations::CreateBranch {
id: OperationId {
replica_id: Default::default(),
operation_count: OperationCount(id),
},
parent: Default::default(),
name: "".into(),
})
}
fn digest_counts(digests: &[Digest]) -> Vec<usize> {
digests.iter().map(|d| d.count).collect()
}
}

201
crates/crdb/src/test.rs Normal file
View File

@@ -0,0 +1,201 @@
use crate::{ClientNetwork, ClientRoom, RoomCredentials, RoomName, RoomToken, ServerNetwork, User};
use anyhow::{anyhow, Result};
use collections::BTreeMap;
use futures::{channel::mpsc, future::BoxFuture, FutureExt, StreamExt};
use gpui::BackgroundExecutor;
use parking_lot::Mutex;
use std::sync::Arc;
pub struct TestNetwork(Arc<Mutex<NetworkState>>);
impl TestNetwork {
pub fn new(executor: BackgroundExecutor) -> Self {
Self(Arc::new(Mutex::new(NetworkState {
executor,
request_handler: None,
rooms: Default::default(),
})))
}
pub fn server(&self) -> TestServerNetwork {
TestServerNetwork(self.0.clone())
}
pub fn client(&self, login: impl Into<Arc<str>>) -> TestClientNetwork {
TestClientNetwork {
user: User {
login: login.into(),
},
network: self.0.clone(),
}
}
}
struct NetworkState {
executor: BackgroundExecutor,
request_handler:
Option<Box<dyn Send + Fn(User, Vec<u8>) -> Result<BoxFuture<'static, Result<Vec<u8>>>>>>,
rooms: BTreeMap<RoomName, Room>,
}
#[derive(Default)]
pub struct Room {
inboxes: BTreeMap<RoomToken, mpsc::UnboundedSender<Vec<u8>>>,
authorized_users: BTreeMap<RoomToken, Arc<str>>,
next_token_id: usize,
}
pub struct TestServerNetwork(Arc<Mutex<NetworkState>>);
impl ServerNetwork for TestServerNetwork {
fn create_room(&self, name: &RoomName) -> BoxFuture<Result<()>> {
let network = self.0.clone();
let room = name.clone();
async move {
let executor = network.lock().executor.clone();
executor.simulate_random_delay().await;
network.lock().rooms.insert(room, Default::default());
Ok(())
}
.boxed()
}
fn grant_room_access(&self, room: &RoomName, user: &str) -> RoomToken {
let mut network = self.0.lock();
let room = network.rooms.get_mut(&room).expect("room must exist");
let token_id = room.next_token_id;
room.next_token_id += 1;
let token = RoomToken(format!("{}/{}", token_id, user).into());
room.authorized_users.insert(token.clone(), user.into());
token
}
fn handle_requests<H, F>(&self, handle_request: H)
where
H: 'static + Send + Fn(User, Vec<u8>) -> Result<F>,
F: 'static + Send + futures::Future<Output = Result<Vec<u8>>>,
{
self.0.lock().request_handler = Some(Box::new(move |user, request| {
handle_request(user, request.clone()).map(FutureExt::boxed)
}));
}
}
pub struct TestClientNetwork {
user: User,
network: Arc<Mutex<NetworkState>>,
}
impl ClientNetwork for TestClientNetwork {
type Room = TestClientRoom;
fn request(&self, request: Vec<u8>) -> BoxFuture<Result<Vec<u8>>> {
let response =
self.network.lock().request_handler.as_ref().unwrap()(self.user.clone(), request);
async move { response?.await }.boxed()
}
fn room(&self, credentials: RoomCredentials) -> Self::Room {
TestClientRoom {
outbox: Default::default(),
credentials,
message_handler: Default::default(),
network: self.network.clone(),
}
}
}
pub struct TestClientRoom {
outbox: Option<mpsc::UnboundedSender<Vec<u8>>>,
credentials: RoomCredentials,
message_handler: Arc<Mutex<Option<Box<dyn Send + Fn(Vec<u8>)>>>>,
network: Arc<Mutex<NetworkState>>,
}
impl ClientRoom for TestClientRoom {
fn connect(&mut self) -> BoxFuture<Result<()>> {
assert!(
self.outbox.is_none(),
"client should not connect more than once"
);
let (inbox_tx, mut inbox_rx) = mpsc::unbounded();
{
let mut network = self.network.lock();
let room = network
.rooms
.get_mut(&self.credentials.name)
.expect("room should exist");
if !room.authorized_users.contains_key(&self.credentials.token) {
return std::future::ready(Err(anyhow!(
"token {:?} is not authorized to enter room {:?}",
self.credentials.token,
self.credentials.name
)))
.boxed();
}
let existing_inbox = room
.inboxes
.insert(self.credentials.token.clone(), inbox_tx);
assert!(
existing_inbox.is_none(),
"client should not connect twice with the same token"
);
}
let message_handler = self.message_handler.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(message) = inbox_rx.next().await {
if let Some(handler) = message_handler.lock().as_ref() {
handler(message);
}
}
})
.detach();
// Send outbound messages to other clients in the room.
let (outbox_tx, mut outbox_rx) = mpsc::unbounded();
self.outbox = Some(outbox_tx);
let executor = self.network.lock().executor.clone();
let network = self.network.clone();
let credentials = self.credentials.clone();
self.network
.lock()
.executor
.spawn(async move {
while let Some(message) = outbox_rx.next().await {
let inboxes = network
.lock()
.rooms
.get(&credentials.name)
.map(|room| room.inboxes.clone());
if let Some(inboxes) = inboxes {
for (inbox_token, inbox) in inboxes {
executor.simulate_random_delay().await;
if inbox_token != credentials.token {
let _ = inbox.unbounded_send(message.clone());
}
}
}
}
})
.detach();
async { Ok(()) }.boxed()
}
fn broadcast(&self, message: Vec<u8>) {
let tx = self.outbox.as_ref().expect("must be connected");
tx.unbounded_send(message).expect("channel must be open");
}
fn handle_messages(&self, handle_message: impl 'static + Send + Fn(Vec<u8>)) {
self.message_handler
.lock()
.replace(Box::new(handle_message));
}
}