diff --git a/crates/crdb/src/digest.rs b/crates/crdb/src/digest.rs index 5999affc80..8c8d6d070d 100644 --- a/crates/crdb/src/digest.rs +++ b/crates/crdb/src/digest.rs @@ -74,15 +74,8 @@ impl DigestSequence { } } - pub fn debug(&self) { - let mut start = 0; - print!("["); - for item in self.digests.iter() { - let end = start + item.count; - print!("{:?}, ", start..end); - start = end; - } - println!("]"); + pub fn items(&self) -> Vec { + self.digests.items(&()) } pub fn operation_count(&self) -> usize { diff --git a/crates/crdb/src/sync.rs b/crates/crdb/src/sync.rs index 044dbbd25c..9ba77fd81b 100644 --- a/crates/crdb/src/sync.rs +++ b/crates/crdb/src/sync.rs @@ -114,42 +114,66 @@ fn sync( let mut server_digests = DigestSequence::new(); let digests = request_digests(server, 0..usize::MAX, base, depth, min_operations); server_digests.splice(0..0, digests.iter().cloned()); - let max_sync_range = 0..(client.summary().digest.count + server_digests.operation_count()); + let server_operation_count = server_digests.operation_count(); + let max_sync_range = 0..(client.summary().digest.count + server_operation_count); let mut stack = subdivide_range(max_sync_range, base, depth, min_operations).collect::>(); stack.reverse(); + let mut missed_server_ops = Vec::new(); + let mut server_end = 0; let mut synced_end = 0; while let Some(mut sync_range) = stack.pop() { + if server_end >= server_operation_count && sync_range.start >= client.summary().digest.count + { + // We've exhaused all operations from the client and the server, so we're done. + break; + } else if sync_range.end < synced_end { + // This range has already been synced, so we can skip it. + continue; + } + sync_range.start = cmp::max(sync_range.start, synced_end); let server_digest = server_digests.digest(sync_range.clone()); sync_range.end = cmp::max(sync_range.start + server_digest.count, sync_range.end); + let server_range = server_end..server_end + sync_range.len(); + let client_digest = digest_for_range(client, sync_range.clone()); if client_digest == server_digest { - if client_digest.count == 0 { - break; - } - synced_end = sync_range.end; + server_end += server_digest.count; + } else if client_digest.count == 0 { + // Client has exhausted its operations, which means that the we don't need to + // diff anymore and we can just fetch the remaining operations from the server. + break; } else if sync_range.len() > min_operations { - let digests = request_digests(server, sync_range.clone(), base, depth, min_operations); + // If there are still operations that we've missed from the server, subdivide + // them into chunks and request their digests. + let digests = if server_range.start < server_operation_count { + request_digests(server, server_range.clone(), base, depth, min_operations) + } else { + Vec::new() + }; server_digests.splice(sync_range.clone(), digests.iter().cloned()); let old_stack_len = stack.len(); + stack.extend(subdivide_range(sync_range, base, depth, min_operations)); stack[old_stack_len..].reverse(); } else { - let mut missed_client_ops = Vec::new(); - let mut missed_server_ops = Vec::new(); - - let server_operations = request_operations(server, sync_range.clone()); + // If there are still operations that we've missed from the server, fetch them. + let server_operations = if server_range.start < server_operation_count { + request_operations(server, server_range.clone()) + } else { + Vec::new() + }; server_digests.splice( sync_range.clone(), server_operations.iter().map(|op| op.into()), ); + let mut missed_client_ops = Vec::new(); let mut server_operations = server_operations.into_iter().peekable(); let mut client_operations = operations_for_range(client, sync_range.clone()).peekable(); - let mut server_ix = sync_range.start; for _ in sync_range.clone() { match (client_operations.peek(), server_operations.peek()) { (Some(client_operation), Some(server_operation)) => { @@ -159,43 +183,54 @@ fn sync( missed_server_ops .push(btree::Edit::Insert(client_operation.clone())); server_digests - .splice(server_ix..server_ix, [client_operation.into()]); - server_ix += 1; + .splice(synced_end..synced_end, [client_operation.into()]); } Ordering::Equal => { client_operations.next().unwrap(); server_operations.next().unwrap(); - server_ix += 1; + server_end += 1; } Ordering::Greater => { let server_operation = server_operations.next().unwrap(); missed_client_ops.push(btree::Edit::Insert(server_operation)); + server_end += 1; } } } (None, Some(_)) => { let server_operation = server_operations.next().unwrap(); missed_client_ops.push(btree::Edit::Insert(server_operation)); + server_end += 1; } (Some(_), None) => { let client_operation = client_operations.next().unwrap(); missed_server_ops.push(btree::Edit::Insert(client_operation.clone())); - server_digests.splice(server_ix..server_ix, [client_operation.into()]); - server_ix += 1; + server_digests.splice(synced_end..synced_end, [client_operation.into()]); } (None, None) => break, } + + synced_end += 1; } drop(client_operations); client.edit(missed_client_ops, &()); - - // Publish these over the network in real implementation. - server.edit(missed_server_ops, &()); - - synced_end = sync_range.end; } } + + // Fetch the remainder of the server's operations in one shot. + if server_end < server_operation_count { + let remaining_server_ops = request_operations(server, server_end..); + client.edit( + remaining_server_ops + .into_iter() + .map(btree::Edit::Insert) + .collect(), + &(), + ); + } + + server.edit(missed_server_ops, &()); } fn digest_for_range(operations: &btree::Sequence, range: Range) -> Digest { @@ -244,18 +279,55 @@ mod tests { #[test] fn test_sync() { - assert_sync(1..=10, 5..=10); - assert_sync(1..=10, 4..=10); - assert_sync(1..=10, 1..=5); - assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]); - assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); - assert_sync(1..=10, 5..=14); + assert_sync_with_config(1..=10, 5..=10, 8, 3, 512); + assert_sync_with_config(1..=10, 4..=10, 8, 3, 512); + assert_sync_with_config(1..=10, 1..=5, 8, 3, 512); + assert_sync_with_config([1, 3, 5, 7, 9], [2, 4, 6, 8, 10], 8, 3, 512); + assert_sync_with_config( + [1, 2, 3, 4, 6, 7, 8, 9, 11, 12], + [4, 5, 6, 10, 12], + 8, + 3, + 512, + ); + assert_sync_with_config(1..=10, 5..=14, 8, 3, 512); + assert_sync_with_config(1..=80, (1..=70).chain(90..=100), 8, 3, 512); + assert_sync_with_config(1..=1910, (1..=1900).chain(1910..=2000), 8, 3, 512); + assert_sync_with_config(1..=190100, (1..=190000).chain(191000..=1000000), 8, 3, 512); } fn assert_sync( client_ops: impl IntoIterator, server_ops: impl IntoIterator, ) { + let client_ops = client_ops.into_iter().collect::>(); + let server_ops = server_ops.into_iter().collect::>(); + for base in 2..=16 { + for depth in 1..=4 { + for min_operations in 1..=16 { + assert_sync_with_config( + client_ops.clone(), + server_ops.clone(), + base, + depth, + min_operations, + ); + } + } + } + } + + fn assert_sync_with_config( + client_ops: impl IntoIterator, + server_ops: impl IntoIterator, + base: usize, + depth: u32, + min_operations: usize, + ) { + println!( + "base: {}, depth: {}, min_operations: {}", + base, depth, min_operations + ); let client_ops = client_ops .into_iter() .map(build_operation) @@ -264,35 +336,26 @@ mod tests { .into_iter() .map(build_operation) .collect::>(); - for base in 2..16 { - for depth in 1..=4 { - for min_operations in 1..16 { - println!( - "base: {}, depth: {}, min_operations: {}", - base, depth, min_operations - ); - let mut client_operations = btree::Sequence::from_iter(client_ops.clone(), &()); - let mut server_operations = btree::Sequence::from_iter(server_ops.clone(), &()); - sync( - &mut client_operations, - &mut server_operations, - base, - depth, - min_operations, - ); - assert_eq!( - client_operations - .iter() - .map(|op| op.id()) - .collect::>(), - server_operations - .iter() - .map(|op| op.id()) - .collect::>() - ); - } - } - } + let mut client_operations = btree::Sequence::from_iter(client_ops, &()); + let mut server_operations = btree::Sequence::from_iter(server_ops, &()); + sync( + &mut client_operations, + &mut server_operations, + base, + depth, + min_operations, + ); + + assert_eq!( + client_operations + .iter() + .map(|op| op.id()) + .collect::>(), + server_operations + .iter() + .map(|op| op.id()) + .collect::>() + ); } #[test]