Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Antonio Scandurra
2023-08-03 18:09:42 +02:00
parent 26cb5e316d
commit eb79ea6ee5
2 changed files with 256 additions and 210 deletions

View File

@@ -6,7 +6,7 @@ use crate::{
};
use bromberg_sl2::HashMatrix;
#[derive(Clone, Default)]
#[derive(Clone, Default, PartialEq, Eq)]
pub struct Digest {
pub count: usize,
pub hash: HashMatrix,
@@ -74,52 +74,40 @@ impl DigestSequence {
}
}
pub fn items(&self) -> Vec<Digest> {
self.digests.items(&())
}
pub fn digest(&self, range: Range<usize>) -> (Digest, Digest) {
let start = cmp::min(range.start, self.digests.summary().count);
let end = cmp::min(range.end, self.digests.summary().count);
let mut cursor = self.digests.cursor::<usize>();
cursor.seek(&start, Bias::Right, &());
assert_eq!(*cursor.start(), start, "start is inside an unfilled range");
let left_digest_hash = cursor.summary(&end, Bias::Right, &());
let left_digest = Digest {
count: *cursor.start() - start,
hash: left_digest_hash,
};
let right_digest = if end == *cursor.start() {
left_digest.clone()
} else {
let digest = cursor.item().unwrap();
Digest {
count: cursor.end(&()) - start,
hash: left_digest.hash * digest.hash,
}
};
(left_digest, right_digest)
}
pub fn insert(&mut self, index: usize, digest: Digest) {
if index > self.digests.summary().count {
panic!("index out of bounds");
pub fn debug(&self) {
let mut start = 0;
print!("[");
for item in self.digests.iter() {
let end = start + item.count;
print!("{:?}, ", start..end);
start = end;
}
println!("]");
}
pub fn digest(&self, mut range: Range<usize>) -> Digest {
let mut count = range.len();
range.start = cmp::min(range.start, self.digests.summary().count);
range.end = cmp::min(range.end, self.digests.summary().count);
let mut cursor = self.digests.cursor::<usize>();
let mut new_digests = cursor.slice(&index, Bias::Right, &());
cursor.seek(&range.start, Bias::Right, &());
assert_eq!(
*cursor.start(),
index,
"tried to insert into an unfilled range"
range.start,
"start is not at the start of a digest range"
);
new_digests.push(digest, &());
new_digests.append(cursor.suffix(&()), &());
drop(cursor);
self.digests = new_digests;
let mut hash = cursor.summary(&range.end, Bias::Right, &());
if range.end > *cursor.start() {
let digest = cursor.item().unwrap();
hash = hash * digest.hash;
cursor.next(&());
count = cmp::max(*cursor.start() - range.start, count);
}
Digest { count, hash }
}
pub fn fill(&mut self, mut range: Range<usize>, digests: impl IntoIterator<Item = Digest>) {
pub fn splice(&mut self, mut range: Range<usize>, digests: impl IntoIterator<Item = Digest>) {
let max_index = self.digests.summary().count;
if range.start > max_index {
panic!("range out of bounds");
@@ -128,16 +116,12 @@ impl DigestSequence {
let mut cursor = self.digests.cursor::<usize>();
let mut new_digests = cursor.slice(&range.start, Bias::Right, &());
assert_eq!(
*cursor.start(),
range.start,
"start is inside an unfilled range"
);
assert_eq!(*cursor.start(), range.start, "start is nedigest range");
cursor.seek(&range.end, Bias::Right, &());
assert_eq!(
*cursor.start(),
range.end,
"end is inside an unfilled range"
"end is not at the start of a digest range"
);
new_digests.extend(digests, &());
new_digests.append(cursor.suffix(&()), &());

View File

@@ -1,13 +1,13 @@
use crate::{
btree::{self, Bias},
digest::DigestSequence,
digest::{Digest, DigestSequence},
messages::Operation,
OperationId,
};
use bromberg_sl2::HashMatrix;
use collections::VecDeque;
use std::{
cmp, iter,
cmp::{self, Ordering},
iter,
ops::{Range, RangeBounds},
};
@@ -17,8 +17,7 @@ impl btree::Item for Operation {
fn summary(&self) -> Self::Summary {
OperationSummary {
max_id: self.id(),
digest: self.id().digest(),
count: 1,
digest: Digest::from(self),
}
}
}
@@ -34,8 +33,7 @@ impl btree::KeyedItem for Operation {
#[derive(Clone, Debug, Default)]
pub struct OperationSummary {
max_id: OperationId,
digest: bromberg_sl2::HashMatrix,
count: usize,
digest: Digest,
}
impl btree::Summary for OperationSummary {
@@ -44,8 +42,7 @@ impl btree::Summary for OperationSummary {
fn add_summary(&mut self, summary: &Self, _: &()) {
debug_assert!(self.max_id < summary.max_id);
self.max_id = summary.max_id;
self.digest = self.digest * summary.digest;
self.count += summary.count;
Digest::add_summary(&mut self.digest, &summary.digest, &());
}
}
@@ -58,13 +55,13 @@ impl btree::Dimension<'_, OperationSummary> for OperationId {
impl btree::Dimension<'_, OperationSummary> for usize {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
*self += summary.count;
*self += summary.digest.count;
}
}
impl btree::Dimension<'_, OperationSummary> for HashMatrix {
impl btree::Dimension<'_, OperationSummary> for Digest {
fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) {
*self = *self * summary.digest;
Digest::add_summary(self, &summary.digest, &());
}
}
@@ -73,158 +70,106 @@ struct RangeDigest {
digest: HashMatrix,
}
fn range_digests(
fn request_digests(
operations: &btree::Sequence<Operation>,
root_range: Range<usize>,
base: usize,
target_depth: u32,
) -> Vec<RangeDigest> {
struct Frame {
digest: RangeDigest,
depth: u32,
) -> Vec<Digest> {
let digest_count = base.pow(target_depth);
let subrange_len = (root_range.len() + digest_count - 1) / digest_count;
let mut digests = Vec::with_capacity(digest_count);
let mut subrange_start = root_range.start;
while subrange_start < root_range.end {
let subrange_end = subrange_start + subrange_len;
digests.push(digest_for_range(operations, subrange_start..subrange_end));
subrange_start = subrange_end;
}
let mut digests = Vec::new();
let mut cursor = operations.cursor::<usize>();
cursor.seek(&root_range.start, Bias::Right, &());
let root_digest = cursor.summary(&root_range.end, Bias::Right, &());
let mut queue = VecDeque::new();
queue.push_back(Frame {
depth: 0,
digest: RangeDigest {
range: root_range,
digest: root_digest,
},
});
while let Some(Frame { depth, digest }) = queue.pop_front() {
let range = digest.range.clone();
if depth == target_depth || range.len() <= base {
digests.push(digest);
continue;
}
let mut start = range.start;
let subrange_size = (range.len() + base - 1) / base;
for _ in 0..base {
let end = cmp::min(start + subrange_size, range.end);
cursor.seek(&start, Bias::Right, &());
let digest = cursor.summary(&end, Bias::Right, &());
queue.push_back(Frame {
depth: depth + 1,
digest: RangeDigest {
range: start..end,
digest,
},
});
start = end;
}
}
digests
}
fn sync(client: &mut btree::Sequence<Operation>, server: &mut btree::Sequence<Operation>) {
const BASE: usize = 2;
const DEPTH: u32 = 2;
let count = 2 * cmp::max(client.summary().count, server.summary().count);
const MIN_OPERATIONS: usize = 5;
let max_sync_range = 0..(client.summary().digest.count + server.summary().digest.count);
let mut stack = vec![max_sync_range];
let mut server_digests = DigestSequence::new();
let mut queue = Vec::new();
for digest_range in range_digests(server, 0..count, BASE, DEPTH)
.into_iter()
.rev()
{
queue.push(digest_range.range.clone());
let mut server_range = digest_range.range.clone();
server_range.start = cmp::min(server_range.start, server.summary().count);
server_range.end = cmp::min(server_range.end, server.summary().count);
if !server_range.is_empty() {
server_digests.insert(
0,
crate::digest::Digest::new(server_range.len(), digest_range.digest),
)
}
}
while let Some(mut range) = queue.pop() {
let (left_server_digest, right_server_digest) = server_digests.digest(range.clone());
let left_digest_range = range.start..range.start + left_server_digest.count;
let right_digest_range =
range.start..cmp::max(range.end, range.start + right_server_digest.count);
let client_right_digest = digest_in_range(&client, right_digest_range.clone());
let client_left_digest = digest_in_range(&client, left_digest_range.clone());
if client_right_digest == right_server_digest.hash {
let mut synced_end = 0;
while let Some(mut sync_range) = stack.pop() {
sync_range.start = cmp::max(sync_range.start, synced_end);
println!("visiting {:?}", sync_range);
server_digests.debug();
let server_digest = server_digests.digest(sync_range.clone());
sync_range.end = sync_range.start + server_digest.count;
println!("server digest range was {:?}", sync_range);
let client_digest = digest_for_range(client, sync_range.clone());
if client_digest == server_digest {
println!("digests are the same in {:?}", sync_range);
synced_end = sync_range.end;
continue;
} else if sync_range.len() > MIN_OPERATIONS {
println!("digests are not the same, recursing");
let digests = request_digests(server, sync_range.clone(), BASE, DEPTH);
server_digests.splice(sync_range, digests.iter().cloned());
let stack_ix = stack.len();
let mut start = 0;
for digest in digests {
let end = start + digest.count;
stack.push(start..end);
start = end;
}
stack[stack_ix..].reverse();
} else {
if client_left_digest == left_server_digest.hash {
range = left_digest_range.end..right_digest_range.end;
} else {
range = right_digest_range;
}
if range.len() > 128 {
let mut digests = Vec::new();
println!("roundtrip for digests");
for range in range_digests(server, range.clone(), BASE, DEPTH)
.into_iter()
.rev()
{
queue.push(range.range.clone());
digests.push(range);
}
server_digests.fill(
range,
digests.into_iter().rev().map(|digest_range| {
crate::digest::Digest::new(digest_range.range.len(), digest_range.digest)
}),
);
continue;
}
println!("roundtrip for operations {:?}", range);
let client_operations = operations_in_range(&client, range.clone());
let server_operations = operations_in_range(&server, range.clone())
.cloned()
.collect::<Vec<_>>();
server_digests.fill(range.clone(), server_operations.iter().map(|op| op.into()));
let mut server_ix = range.start;
let mut client_operations = client_operations.peekable();
let mut server_operations = server_operations.into_iter().peekable();
let mut missed_server_ops = Vec::new();
let mut missed_client_ops = Vec::new();
for _ in range.clone() {
let mut missed_server_ops = Vec::new();
let server_operations = request_operations(server, sync_range.clone());
server_digests.splice(
sync_range.clone(),
server_operations.iter().map(|op| op.into()),
);
let mut server_operations = server_operations.into_iter().peekable();
let mut client_operations = operations_for_range(client, sync_range.clone()).peekable();
let mut server_ix = sync_range.start;
for _ in sync_range.clone() {
match (client_operations.peek(), server_operations.peek()) {
(Some(client_operation), Some(server_operation)) => {
match client_operation.id().cmp(&server_operation.id()) {
cmp::Ordering::Less => {
Ordering::Less => {
let client_operation = client_operations.next().unwrap();
println!("server missed {:?}", client_operation.id());
missed_server_ops
.push(btree::Edit::Insert(client_operation.clone()));
server_digests.insert(server_ix, client_operation.into());
server_digests
.splice(server_ix..server_ix, [client_operation.into()]);
server_ix += 1;
}
cmp::Ordering::Equal => {
Ordering::Equal => {
client_operations.next().unwrap();
server_operations.next().unwrap();
server_ix += 1;
}
cmp::Ordering::Greater => {
Ordering::Greater => {
let server_operation = server_operations.next().unwrap();
println!("client missed {:?}", server_operation.id());
missed_client_ops.push(btree::Edit::Insert(server_operation));
}
}
}
(None, Some(_)) => {
let server_operation = server_operations.next().unwrap();
println!("client missed {:?}", server_operation.id());
missed_client_ops.push(btree::Edit::Insert(server_operation));
}
(Some(_), None) => {
let client_operation = client_operations.next().unwrap();
println!("server missed {:?}", client_operation.id());
missed_server_ops.push(btree::Edit::Insert(client_operation.clone()));
server_digests.insert(server_ix, client_operation.into());
server_digests.splice(server_ix..server_ix, [client_operation.into()]);
server_ix += 1;
}
(None, None) => break,
@@ -233,24 +178,148 @@ fn sync(client: &mut btree::Sequence<Operation>, server: &mut btree::Sequence<Op
drop(client_operations);
client.edit(missed_client_ops, &());
// Publish these over the network in real implementation.
server.edit(missed_server_ops, &());
synced_end = sync_range.end;
}
}
}
fn digest_in_range(operations: &btree::Sequence<Operation>, range: Range<usize>) -> HashMatrix {
// fn sync(client: &mut btree::Sequence<Operation>, server: &mut btree::Sequence<Operation>) {
// const BASE: usize = 2;
// const DEPTH: u32 = 3;
// let count = 2 * cmp::max(client.summary().count, server.summary().count);
// let mut server_digests = DigestSequence::new();
// let mut queue = Vec::new();
// queue.push(0..count);
// while let Some(mut range) = queue.pop() {
// server_digests.debug();
// let server_digest = server_digests.digest(range.clone());
// let server_digest_range = range.start..range.start + server_digest.count;
// let client_digest = digest_in_range(&client, left_digest_range.clone());
// if client_right_digest == right_server_digest.hash {
// continue;
// } else {
// if client_left_digest == left_server_digest.hash {
// range = left_digest_range.end..right_digest_range.end;
// } else {
// range = right_digest_range;
// }
// if range.len() > BASE.pow(DEPTH) {
// let mut digests = Vec::new();
// println!("roundtrip for digests in range {:?}", range);
// for range in request_digests(server, range.clone(), BASE, DEPTH)
// .into_iter()
// .rev()
// {
// queue.push(range.range.clone());
// digests.push(range);
// }
// println!("before");
// server_digests.debug();
// server_digests.splice(
// range,
// digests.into_iter().rev().filter_map(|digest_range| {
// let start = cmp::min(digest_range.range.start, server.summary().count);
// let end = cmp::min(digest_range.range.end, server.summary().count);
// if start < end {
// Some(crate::digest::Digest::new(end - start, digest_range.digest))
// } else {
// None
// }
// }),
// );
// println!("after");
// server_digests.debug();
// continue;
// }
// dbg!(&range);
// println!("roundtrip for operations {:?}", range);
// let client_operations = operations_in_range(&client, range.clone());
// let server_operations = operations_in_range(&server, range.clone())
// .cloned()
// .collect::<Vec<_>>();
// server_digests.splice(range.clone(), server_operations.iter().map(|op| op.into()));
// let mut server_ix = range.start;
// let mut client_operations = client_operations.peekable();
// let mut server_operations = server_operations.into_iter().peekable();
// let mut missed_server_ops = Vec::new();
// let mut missed_client_ops = Vec::new();
// for _ in range.clone() {
// match (client_operations.peek(), server_operations.peek()) {
// (Some(client_operation), Some(server_operation)) => {
// match client_operation.id().cmp(&server_operation.id()) {
// cmp::Ordering::Less => {
// let client_operation = client_operations.next().unwrap();
// println!("server missed {:?}", client_operation.id());
// missed_server_ops
// .push(btree::Edit::Insert(client_operation.clone()));
// server_digests
// .splice(server_ix..server_ix, [client_operation.into()]);
// server_ix += 1;
// }
// cmp::Ordering::Equal => {
// client_operations.next().unwrap();
// server_operations.next().unwrap();
// server_ix += 1;
// }
// cmp::Ordering::Greater => {
// let server_operation = server_operations.next().unwrap();
// println!("client missed {:?}", server_operation.id());
// missed_client_ops.push(btree::Edit::Insert(server_operation));
// }
// }
// }
// (None, Some(_)) => {
// let server_operation = server_operations.next().unwrap();
// println!("client missed {:?}", server_operation.id());
// missed_client_ops.push(btree::Edit::Insert(server_operation));
// }
// (Some(_), None) => {
// let client_operation = client_operations.next().unwrap();
// println!("server missed {:?}", client_operation.id());
// missed_server_ops.push(btree::Edit::Insert(client_operation.clone()));
// server_digests.splice(server_ix..server_ix, [client_operation.into()]);
// server_ix += 1;
// }
// (None, None) => break,
// }
// }
// drop(client_operations);
// client.edit(missed_client_ops, &());
// server.edit(missed_server_ops, &());
// }
// }
// }
fn digest_for_range(operations: &btree::Sequence<Operation>, range: Range<usize>) -> Digest {
let mut cursor = operations.cursor::<usize>();
cursor.seek(&range.start, Bias::Right, &());
cursor.summary(&range.end, Bias::Right, &())
let mut digest: Digest = cursor.summary(&range.end, Bias::Right, &());
digest.count = range.len();
digest
}
fn operations_in_range<T>(
fn request_operations<T: RangeBounds<usize>>(
operations: &btree::Sequence<Operation>,
range: T,
) -> impl Iterator<Item = &Operation>
where
T: RangeBounds<usize>,
{
) -> Vec<Operation> {
operations_for_range(operations, range).cloned().collect()
}
fn operations_for_range<T: RangeBounds<usize>>(
operations: &btree::Sequence<Operation>,
range: T,
) -> impl Iterator<Item = &Operation> {
let mut cursor = operations.cursor::<usize>();
match range.start_bound() {
collections::Bound::Included(start) => {
@@ -297,16 +366,13 @@ mod tests {
#[test]
fn test_sync() {
assert_sync(1..=10, 5..=10);
assert_sync(1..=10, 4..=10);
// assert_sync(1..=10, 5..=10);
// assert_sync(1..=10, 4..=10);
assert_sync(1..=10, 1..=5);
assert_sync(
(1..=10).filter(|ix| ix % 2 == 0),
(1..=10).filter(|ix| ix % 2 == 1),
);
assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]);
assert_sync(1..=10, 5..=14);
assert_sync(1..=10000, 1..=7000);
// assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]);
// assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]);
// assert_sync(1..=10, 5..=14);
// assert_sync(1..=10000, 1..=7000);
}
fn assert_sync(
@@ -348,41 +414,37 @@ mod tests {
}
#[test]
fn test_range_digests() {
fn test_request_digests() {
let operations = btree::Sequence::from_iter((1..=64).map(build_operation), &());
assert_eq!(ranges(&range_digests(&operations, 0..64, 2, 0,)), [0..64]);
assert_eq!(
ranges(&range_digests(&operations, 0..64, 2, 1)),
[0..32, 32..64]
digest_counts(&request_digests(&operations, 0..64, 2, 0,)),
[64]
);
assert_eq!(
ranges(&range_digests(&operations, 0..64, 2, 2)),
[0..16, 16..32, 32..48, 48..64]
digest_counts(&request_digests(&operations, 0..64, 2, 1)),
[32, 32]
);
assert_eq!(
ranges(&range_digests(&operations, 32..48, 2, 2)),
[32..36, 36..40, 40..44, 44..48]
digest_counts(&request_digests(&operations, 0..64, 2, 2)),
[16, 16, 16, 16]
);
assert_eq!(
digest_counts(&request_digests(&operations, 32..48, 2, 2)),
[4, 4, 4, 4]
);
assert_eq!(ranges(&range_digests(&operations, 0..64, 3, 0)), [0..64]);
assert_eq!(
ranges(&range_digests(&operations, 0..64, 3, 1)),
[0..22, 22..44, 44..64]
digest_counts(&request_digests(&operations, 0..64, 3, 0)),
[64]
);
assert_eq!(
ranges(&range_digests(&operations, 0..64, 3, 2)),
[
0..8,
8..16,
16..22,
22..30,
30..38,
38..44,
44..51,
51..58,
58..64
]
digest_counts(&request_digests(&operations, 0..64, 3, 1)),
[22, 22, 22]
);
assert_eq!(
digest_counts(&request_digests(&operations, 0..64, 3, 2)),
[8, 8, 8, 8, 8, 8, 8, 8]
);
}
@@ -397,7 +459,7 @@ mod tests {
})
}
fn ranges(digests: &[RangeDigest]) -> Vec<Range<usize>> {
digests.iter().map(|d| d.range.clone()).collect()
fn digest_counts(digests: &[Digest]) -> Vec<usize> {
digests.iter().map(|d| d.count).collect()
}
}