Avoid mutating the server as we synchronize

This commit is contained in:
Antonio Scandurra
2023-08-04 11:09:44 +02:00
parent bd0a9eb704
commit 80f9b553c7
2 changed files with 121 additions and 65 deletions

View File

@@ -74,15 +74,8 @@ impl DigestSequence {
}
}
pub fn debug(&self) {
let mut start = 0;
print!("[");
for item in self.digests.iter() {
let end = start + item.count;
print!("{:?}, ", start..end);
start = end;
}
println!("]");
pub fn items(&self) -> Vec<Digest> {
self.digests.items(&())
}
pub fn operation_count(&self) -> usize {

View File

@@ -114,42 +114,66 @@ fn sync(
let mut server_digests = DigestSequence::new();
let digests = request_digests(server, 0..usize::MAX, base, depth, min_operations);
server_digests.splice(0..0, digests.iter().cloned());
let max_sync_range = 0..(client.summary().digest.count + server_digests.operation_count());
let server_operation_count = server_digests.operation_count();
let max_sync_range = 0..(client.summary().digest.count + server_operation_count);
let mut stack =
subdivide_range(max_sync_range, base, depth, min_operations).collect::<Vec<_>>();
stack.reverse();
let mut missed_server_ops = Vec::new();
let mut server_end = 0;
let mut synced_end = 0;
while let Some(mut sync_range) = stack.pop() {
if server_end >= server_operation_count && sync_range.start >= client.summary().digest.count
{
// We've exhaused all operations from the client and the server, so we're done.
break;
} else if sync_range.end < synced_end {
// This range has already been synced, so we can skip it.
continue;
}
sync_range.start = cmp::max(sync_range.start, synced_end);
let server_digest = server_digests.digest(sync_range.clone());
sync_range.end = cmp::max(sync_range.start + server_digest.count, sync_range.end);
let server_range = server_end..server_end + sync_range.len();
let client_digest = digest_for_range(client, sync_range.clone());
if client_digest == server_digest {
if client_digest.count == 0 {
break;
}
synced_end = sync_range.end;
server_end += server_digest.count;
} else if client_digest.count == 0 {
// Client has exhausted its operations, which means that the we don't need to
// diff anymore and we can just fetch the remaining operations from the server.
break;
} else if sync_range.len() > min_operations {
let digests = request_digests(server, sync_range.clone(), base, depth, min_operations);
// If there are still operations that we've missed from the server, subdivide
// them into chunks and request their digests.
let digests = if server_range.start < server_operation_count {
request_digests(server, server_range.clone(), base, depth, min_operations)
} else {
Vec::new()
};
server_digests.splice(sync_range.clone(), digests.iter().cloned());
let old_stack_len = stack.len();
stack.extend(subdivide_range(sync_range, base, depth, min_operations));
stack[old_stack_len..].reverse();
} else {
let mut missed_client_ops = Vec::new();
let mut missed_server_ops = Vec::new();
let server_operations = request_operations(server, sync_range.clone());
// If there are still operations that we've missed from the server, fetch them.
let server_operations = if server_range.start < server_operation_count {
request_operations(server, server_range.clone())
} else {
Vec::new()
};
server_digests.splice(
sync_range.clone(),
server_operations.iter().map(|op| op.into()),
);
let mut missed_client_ops = Vec::new();
let mut server_operations = server_operations.into_iter().peekable();
let mut client_operations = operations_for_range(client, sync_range.clone()).peekable();
let mut server_ix = sync_range.start;
for _ in sync_range.clone() {
match (client_operations.peek(), server_operations.peek()) {
(Some(client_operation), Some(server_operation)) => {
@@ -159,43 +183,54 @@ fn sync(
missed_server_ops
.push(btree::Edit::Insert(client_operation.clone()));
server_digests
.splice(server_ix..server_ix, [client_operation.into()]);
server_ix += 1;
.splice(synced_end..synced_end, [client_operation.into()]);
}
Ordering::Equal => {
client_operations.next().unwrap();
server_operations.next().unwrap();
server_ix += 1;
server_end += 1;
}
Ordering::Greater => {
let server_operation = server_operations.next().unwrap();
missed_client_ops.push(btree::Edit::Insert(server_operation));
server_end += 1;
}
}
}
(None, Some(_)) => {
let server_operation = server_operations.next().unwrap();
missed_client_ops.push(btree::Edit::Insert(server_operation));
server_end += 1;
}
(Some(_), None) => {
let client_operation = client_operations.next().unwrap();
missed_server_ops.push(btree::Edit::Insert(client_operation.clone()));
server_digests.splice(server_ix..server_ix, [client_operation.into()]);
server_ix += 1;
server_digests.splice(synced_end..synced_end, [client_operation.into()]);
}
(None, None) => break,
}
synced_end += 1;
}
drop(client_operations);
client.edit(missed_client_ops, &());
// Publish these over the network in real implementation.
server.edit(missed_server_ops, &());
synced_end = sync_range.end;
}
}
// Fetch the remainder of the server's operations in one shot.
if server_end < server_operation_count {
let remaining_server_ops = request_operations(server, server_end..);
client.edit(
remaining_server_ops
.into_iter()
.map(btree::Edit::Insert)
.collect(),
&(),
);
}
server.edit(missed_server_ops, &());
}
fn digest_for_range(operations: &btree::Sequence<Operation>, range: Range<usize>) -> Digest {
@@ -244,18 +279,55 @@ mod tests {
#[test]
fn test_sync() {
assert_sync(1..=10, 5..=10);
assert_sync(1..=10, 4..=10);
assert_sync(1..=10, 1..=5);
assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]);
assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]);
assert_sync(1..=10, 5..=14);
assert_sync_with_config(1..=10, 5..=10, 8, 3, 512);
assert_sync_with_config(1..=10, 4..=10, 8, 3, 512);
assert_sync_with_config(1..=10, 1..=5, 8, 3, 512);
assert_sync_with_config([1, 3, 5, 7, 9], [2, 4, 6, 8, 10], 8, 3, 512);
assert_sync_with_config(
[1, 2, 3, 4, 6, 7, 8, 9, 11, 12],
[4, 5, 6, 10, 12],
8,
3,
512,
);
assert_sync_with_config(1..=10, 5..=14, 8, 3, 512);
assert_sync_with_config(1..=80, (1..=70).chain(90..=100), 8, 3, 512);
assert_sync_with_config(1..=1910, (1..=1900).chain(1910..=2000), 8, 3, 512);
assert_sync_with_config(1..=190100, (1..=190000).chain(191000..=1000000), 8, 3, 512);
}
fn assert_sync(
client_ops: impl IntoIterator<Item = usize>,
server_ops: impl IntoIterator<Item = usize>,
) {
let client_ops = client_ops.into_iter().collect::<Vec<_>>();
let server_ops = server_ops.into_iter().collect::<Vec<_>>();
for base in 2..=16 {
for depth in 1..=4 {
for min_operations in 1..=16 {
assert_sync_with_config(
client_ops.clone(),
server_ops.clone(),
base,
depth,
min_operations,
);
}
}
}
}
fn assert_sync_with_config(
client_ops: impl IntoIterator<Item = usize>,
server_ops: impl IntoIterator<Item = usize>,
base: usize,
depth: u32,
min_operations: usize,
) {
println!(
"base: {}, depth: {}, min_operations: {}",
base, depth, min_operations
);
let client_ops = client_ops
.into_iter()
.map(build_operation)
@@ -264,35 +336,26 @@ mod tests {
.into_iter()
.map(build_operation)
.collect::<Vec<_>>();
for base in 2..16 {
for depth in 1..=4 {
for min_operations in 1..16 {
println!(
"base: {}, depth: {}, min_operations: {}",
base, depth, min_operations
);
let mut client_operations = btree::Sequence::from_iter(client_ops.clone(), &());
let mut server_operations = btree::Sequence::from_iter(server_ops.clone(), &());
sync(
&mut client_operations,
&mut server_operations,
base,
depth,
min_operations,
);
assert_eq!(
client_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>(),
server_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>()
);
}
}
}
let mut client_operations = btree::Sequence::from_iter(client_ops, &());
let mut server_operations = btree::Sequence::from_iter(server_ops, &());
sync(
&mut client_operations,
&mut server_operations,
base,
depth,
min_operations,
);
assert_eq!(
client_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>(),
server_operations
.iter()
.map(|op| op.id())
.collect::<Vec<_>>()
);
}
#[test]