diff --git a/crates/crdb/src/digest.rs b/crates/crdb/src/digest.rs index eea347ff16..46da43f126 100644 --- a/crates/crdb/src/digest.rs +++ b/crates/crdb/src/digest.rs @@ -6,7 +6,7 @@ use crate::{ }; use bromberg_sl2::HashMatrix; -#[derive(Clone, Default)] +#[derive(Clone, Default, PartialEq, Eq)] pub struct Digest { pub count: usize, pub hash: HashMatrix, @@ -74,52 +74,40 @@ impl DigestSequence { } } - pub fn items(&self) -> Vec { - self.digests.items(&()) - } - - pub fn digest(&self, range: Range) -> (Digest, Digest) { - let start = cmp::min(range.start, self.digests.summary().count); - let end = cmp::min(range.end, self.digests.summary().count); - let mut cursor = self.digests.cursor::(); - cursor.seek(&start, Bias::Right, &()); - assert_eq!(*cursor.start(), start, "start is inside an unfilled range"); - let left_digest_hash = cursor.summary(&end, Bias::Right, &()); - let left_digest = Digest { - count: *cursor.start() - start, - hash: left_digest_hash, - }; - let right_digest = if end == *cursor.start() { - left_digest.clone() - } else { - let digest = cursor.item().unwrap(); - Digest { - count: cursor.end(&()) - start, - hash: left_digest.hash * digest.hash, - } - }; - (left_digest, right_digest) - } - - pub fn insert(&mut self, index: usize, digest: Digest) { - if index > self.digests.summary().count { - panic!("index out of bounds"); + pub fn debug(&self) { + let mut start = 0; + print!("["); + for item in self.digests.iter() { + let end = start + item.count; + print!("{:?}, ", start..end); + start = end; } + println!("]"); + } + pub fn digest(&self, mut range: Range) -> Digest { + let mut count = range.len(); + range.start = cmp::min(range.start, self.digests.summary().count); + range.end = cmp::min(range.end, self.digests.summary().count); let mut cursor = self.digests.cursor::(); - let mut new_digests = cursor.slice(&index, Bias::Right, &()); + cursor.seek(&range.start, Bias::Right, &()); assert_eq!( *cursor.start(), - index, - "tried to insert into an unfilled range" + range.start, + "start is not at the start of a digest range" ); - new_digests.push(digest, &()); - new_digests.append(cursor.suffix(&()), &()); - drop(cursor); - self.digests = new_digests; + let mut hash = cursor.summary(&range.end, Bias::Right, &()); + + if range.end > *cursor.start() { + let digest = cursor.item().unwrap(); + hash = hash * digest.hash; + cursor.next(&()); + count = cmp::max(*cursor.start() - range.start, count); + } + Digest { count, hash } } - pub fn fill(&mut self, mut range: Range, digests: impl IntoIterator) { + pub fn splice(&mut self, mut range: Range, digests: impl IntoIterator) { let max_index = self.digests.summary().count; if range.start > max_index { panic!("range out of bounds"); @@ -128,16 +116,12 @@ impl DigestSequence { let mut cursor = self.digests.cursor::(); let mut new_digests = cursor.slice(&range.start, Bias::Right, &()); - assert_eq!( - *cursor.start(), - range.start, - "start is inside an unfilled range" - ); + assert_eq!(*cursor.start(), range.start, "start is nedigest range"); cursor.seek(&range.end, Bias::Right, &()); assert_eq!( *cursor.start(), range.end, - "end is inside an unfilled range" + "end is not at the start of a digest range" ); new_digests.extend(digests, &()); new_digests.append(cursor.suffix(&()), &()); diff --git a/crates/crdb/src/sync.rs b/crates/crdb/src/sync.rs index 0488af94c2..3182990da4 100644 --- a/crates/crdb/src/sync.rs +++ b/crates/crdb/src/sync.rs @@ -1,13 +1,13 @@ use crate::{ btree::{self, Bias}, - digest::DigestSequence, + digest::{Digest, DigestSequence}, messages::Operation, OperationId, }; use bromberg_sl2::HashMatrix; -use collections::VecDeque; use std::{ - cmp, iter, + cmp::{self, Ordering}, + iter, ops::{Range, RangeBounds}, }; @@ -17,8 +17,7 @@ impl btree::Item for Operation { fn summary(&self) -> Self::Summary { OperationSummary { max_id: self.id(), - digest: self.id().digest(), - count: 1, + digest: Digest::from(self), } } } @@ -34,8 +33,7 @@ impl btree::KeyedItem for Operation { #[derive(Clone, Debug, Default)] pub struct OperationSummary { max_id: OperationId, - digest: bromberg_sl2::HashMatrix, - count: usize, + digest: Digest, } impl btree::Summary for OperationSummary { @@ -44,8 +42,7 @@ impl btree::Summary for OperationSummary { fn add_summary(&mut self, summary: &Self, _: &()) { debug_assert!(self.max_id < summary.max_id); self.max_id = summary.max_id; - self.digest = self.digest * summary.digest; - self.count += summary.count; + Digest::add_summary(&mut self.digest, &summary.digest, &()); } } @@ -58,13 +55,13 @@ impl btree::Dimension<'_, OperationSummary> for OperationId { impl btree::Dimension<'_, OperationSummary> for usize { fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) { - *self += summary.count; + *self += summary.digest.count; } } -impl btree::Dimension<'_, OperationSummary> for HashMatrix { +impl btree::Dimension<'_, OperationSummary> for Digest { fn add_summary(&mut self, summary: &'_ OperationSummary, _: &()) { - *self = *self * summary.digest; + Digest::add_summary(self, &summary.digest, &()); } } @@ -73,158 +70,106 @@ struct RangeDigest { digest: HashMatrix, } -fn range_digests( +fn request_digests( operations: &btree::Sequence, root_range: Range, base: usize, target_depth: u32, -) -> Vec { - struct Frame { - digest: RangeDigest, - depth: u32, +) -> Vec { + let digest_count = base.pow(target_depth); + let subrange_len = (root_range.len() + digest_count - 1) / digest_count; + + let mut digests = Vec::with_capacity(digest_count); + let mut subrange_start = root_range.start; + while subrange_start < root_range.end { + let subrange_end = subrange_start + subrange_len; + digests.push(digest_for_range(operations, subrange_start..subrange_end)); + subrange_start = subrange_end; } - - let mut digests = Vec::new(); - let mut cursor = operations.cursor::(); - cursor.seek(&root_range.start, Bias::Right, &()); - let root_digest = cursor.summary(&root_range.end, Bias::Right, &()); - let mut queue = VecDeque::new(); - queue.push_back(Frame { - depth: 0, - digest: RangeDigest { - range: root_range, - digest: root_digest, - }, - }); - - while let Some(Frame { depth, digest }) = queue.pop_front() { - let range = digest.range.clone(); - if depth == target_depth || range.len() <= base { - digests.push(digest); - continue; - } - - let mut start = range.start; - let subrange_size = (range.len() + base - 1) / base; - for _ in 0..base { - let end = cmp::min(start + subrange_size, range.end); - cursor.seek(&start, Bias::Right, &()); - let digest = cursor.summary(&end, Bias::Right, &()); - queue.push_back(Frame { - depth: depth + 1, - digest: RangeDigest { - range: start..end, - digest, - }, - }); - start = end; - } - } - digests } fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence) { const BASE: usize = 2; const DEPTH: u32 = 2; - let count = 2 * cmp::max(client.summary().count, server.summary().count); - + const MIN_OPERATIONS: usize = 5; + let max_sync_range = 0..(client.summary().digest.count + server.summary().digest.count); + let mut stack = vec![max_sync_range]; let mut server_digests = DigestSequence::new(); - let mut queue = Vec::new(); - for digest_range in range_digests(server, 0..count, BASE, DEPTH) - .into_iter() - .rev() - { - queue.push(digest_range.range.clone()); - let mut server_range = digest_range.range.clone(); - server_range.start = cmp::min(server_range.start, server.summary().count); - server_range.end = cmp::min(server_range.end, server.summary().count); - if !server_range.is_empty() { - server_digests.insert( - 0, - crate::digest::Digest::new(server_range.len(), digest_range.digest), - ) - } - } - while let Some(mut range) = queue.pop() { - let (left_server_digest, right_server_digest) = server_digests.digest(range.clone()); - let left_digest_range = range.start..range.start + left_server_digest.count; - let right_digest_range = - range.start..cmp::max(range.end, range.start + right_server_digest.count); - - let client_right_digest = digest_in_range(&client, right_digest_range.clone()); - let client_left_digest = digest_in_range(&client, left_digest_range.clone()); - if client_right_digest == right_server_digest.hash { + let mut synced_end = 0; + while let Some(mut sync_range) = stack.pop() { + sync_range.start = cmp::max(sync_range.start, synced_end); + println!("visiting {:?}", sync_range); + server_digests.debug(); + let server_digest = server_digests.digest(sync_range.clone()); + sync_range.end = sync_range.start + server_digest.count; + println!("server digest range was {:?}", sync_range); + let client_digest = digest_for_range(client, sync_range.clone()); + if client_digest == server_digest { + println!("digests are the same in {:?}", sync_range); + synced_end = sync_range.end; continue; + } else if sync_range.len() > MIN_OPERATIONS { + println!("digests are not the same, recursing"); + let digests = request_digests(server, sync_range.clone(), BASE, DEPTH); + server_digests.splice(sync_range, digests.iter().cloned()); + let stack_ix = stack.len(); + let mut start = 0; + for digest in digests { + let end = start + digest.count; + stack.push(start..end); + start = end; + } + stack[stack_ix..].reverse(); } else { - if client_left_digest == left_server_digest.hash { - range = left_digest_range.end..right_digest_range.end; - } else { - range = right_digest_range; - } - - if range.len() > 128 { - let mut digests = Vec::new(); - println!("roundtrip for digests"); - for range in range_digests(server, range.clone(), BASE, DEPTH) - .into_iter() - .rev() - { - queue.push(range.range.clone()); - digests.push(range); - } - server_digests.fill( - range, - digests.into_iter().rev().map(|digest_range| { - crate::digest::Digest::new(digest_range.range.len(), digest_range.digest) - }), - ); - continue; - } - - println!("roundtrip for operations {:?}", range); - let client_operations = operations_in_range(&client, range.clone()); - let server_operations = operations_in_range(&server, range.clone()) - .cloned() - .collect::>(); - server_digests.fill(range.clone(), server_operations.iter().map(|op| op.into())); - - let mut server_ix = range.start; - let mut client_operations = client_operations.peekable(); - let mut server_operations = server_operations.into_iter().peekable(); - let mut missed_server_ops = Vec::new(); let mut missed_client_ops = Vec::new(); - for _ in range.clone() { + let mut missed_server_ops = Vec::new(); + + let server_operations = request_operations(server, sync_range.clone()); + server_digests.splice( + sync_range.clone(), + server_operations.iter().map(|op| op.into()), + ); + + let mut server_operations = server_operations.into_iter().peekable(); + let mut client_operations = operations_for_range(client, sync_range.clone()).peekable(); + let mut server_ix = sync_range.start; + for _ in sync_range.clone() { match (client_operations.peek(), server_operations.peek()) { (Some(client_operation), Some(server_operation)) => { match client_operation.id().cmp(&server_operation.id()) { - cmp::Ordering::Less => { + Ordering::Less => { let client_operation = client_operations.next().unwrap(); + println!("server missed {:?}", client_operation.id()); missed_server_ops .push(btree::Edit::Insert(client_operation.clone())); - server_digests.insert(server_ix, client_operation.into()); + server_digests + .splice(server_ix..server_ix, [client_operation.into()]); server_ix += 1; } - cmp::Ordering::Equal => { + Ordering::Equal => { client_operations.next().unwrap(); server_operations.next().unwrap(); server_ix += 1; } - cmp::Ordering::Greater => { + Ordering::Greater => { let server_operation = server_operations.next().unwrap(); + println!("client missed {:?}", server_operation.id()); missed_client_ops.push(btree::Edit::Insert(server_operation)); } } } (None, Some(_)) => { let server_operation = server_operations.next().unwrap(); + println!("client missed {:?}", server_operation.id()); missed_client_ops.push(btree::Edit::Insert(server_operation)); } (Some(_), None) => { let client_operation = client_operations.next().unwrap(); + println!("server missed {:?}", client_operation.id()); missed_server_ops.push(btree::Edit::Insert(client_operation.clone())); - server_digests.insert(server_ix, client_operation.into()); + server_digests.splice(server_ix..server_ix, [client_operation.into()]); server_ix += 1; } (None, None) => break, @@ -233,24 +178,148 @@ fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence, range: Range) -> HashMatrix { +// fn sync(client: &mut btree::Sequence, server: &mut btree::Sequence) { +// const BASE: usize = 2; +// const DEPTH: u32 = 3; +// let count = 2 * cmp::max(client.summary().count, server.summary().count); + +// let mut server_digests = DigestSequence::new(); +// let mut queue = Vec::new(); +// queue.push(0..count); + +// while let Some(mut range) = queue.pop() { +// server_digests.debug(); +// let server_digest = server_digests.digest(range.clone()); +// let server_digest_range = range.start..range.start + server_digest.count; +// let client_digest = digest_in_range(&client, left_digest_range.clone()); +// if client_right_digest == right_server_digest.hash { +// continue; +// } else { +// if client_left_digest == left_server_digest.hash { +// range = left_digest_range.end..right_digest_range.end; +// } else { +// range = right_digest_range; +// } + +// if range.len() > BASE.pow(DEPTH) { +// let mut digests = Vec::new(); +// println!("roundtrip for digests in range {:?}", range); +// for range in request_digests(server, range.clone(), BASE, DEPTH) +// .into_iter() +// .rev() +// { +// queue.push(range.range.clone()); +// digests.push(range); +// } + +// println!("before"); +// server_digests.debug(); +// server_digests.splice( +// range, +// digests.into_iter().rev().filter_map(|digest_range| { +// let start = cmp::min(digest_range.range.start, server.summary().count); +// let end = cmp::min(digest_range.range.end, server.summary().count); +// if start < end { +// Some(crate::digest::Digest::new(end - start, digest_range.digest)) +// } else { +// None +// } +// }), +// ); +// println!("after"); +// server_digests.debug(); +// continue; +// } +// dbg!(&range); + +// println!("roundtrip for operations {:?}", range); +// let client_operations = operations_in_range(&client, range.clone()); +// let server_operations = operations_in_range(&server, range.clone()) +// .cloned() +// .collect::>(); +// server_digests.splice(range.clone(), server_operations.iter().map(|op| op.into())); + +// let mut server_ix = range.start; +// let mut client_operations = client_operations.peekable(); +// let mut server_operations = server_operations.into_iter().peekable(); +// let mut missed_server_ops = Vec::new(); +// let mut missed_client_ops = Vec::new(); +// for _ in range.clone() { +// match (client_operations.peek(), server_operations.peek()) { +// (Some(client_operation), Some(server_operation)) => { +// match client_operation.id().cmp(&server_operation.id()) { +// cmp::Ordering::Less => { +// let client_operation = client_operations.next().unwrap(); +// println!("server missed {:?}", client_operation.id()); +// missed_server_ops +// .push(btree::Edit::Insert(client_operation.clone())); +// server_digests +// .splice(server_ix..server_ix, [client_operation.into()]); +// server_ix += 1; +// } +// cmp::Ordering::Equal => { +// client_operations.next().unwrap(); +// server_operations.next().unwrap(); +// server_ix += 1; +// } +// cmp::Ordering::Greater => { +// let server_operation = server_operations.next().unwrap(); +// println!("client missed {:?}", server_operation.id()); +// missed_client_ops.push(btree::Edit::Insert(server_operation)); +// } +// } +// } +// (None, Some(_)) => { +// let server_operation = server_operations.next().unwrap(); +// println!("client missed {:?}", server_operation.id()); +// missed_client_ops.push(btree::Edit::Insert(server_operation)); +// } +// (Some(_), None) => { +// let client_operation = client_operations.next().unwrap(); +// println!("server missed {:?}", client_operation.id()); +// missed_server_ops.push(btree::Edit::Insert(client_operation.clone())); +// server_digests.splice(server_ix..server_ix, [client_operation.into()]); +// server_ix += 1; +// } +// (None, None) => break, +// } +// } + +// drop(client_operations); +// client.edit(missed_client_ops, &()); +// server.edit(missed_server_ops, &()); +// } +// } +// } + +fn digest_for_range(operations: &btree::Sequence, range: Range) -> Digest { let mut cursor = operations.cursor::(); cursor.seek(&range.start, Bias::Right, &()); - cursor.summary(&range.end, Bias::Right, &()) + let mut digest: Digest = cursor.summary(&range.end, Bias::Right, &()); + digest.count = range.len(); + digest } -fn operations_in_range( +fn request_operations>( operations: &btree::Sequence, range: T, -) -> impl Iterator -where - T: RangeBounds, -{ +) -> Vec { + operations_for_range(operations, range).cloned().collect() +} + +fn operations_for_range>( + operations: &btree::Sequence, + range: T, +) -> impl Iterator { let mut cursor = operations.cursor::(); match range.start_bound() { collections::Bound::Included(start) => { @@ -297,16 +366,13 @@ mod tests { #[test] fn test_sync() { - assert_sync(1..=10, 5..=10); - assert_sync(1..=10, 4..=10); + // assert_sync(1..=10, 5..=10); + // assert_sync(1..=10, 4..=10); assert_sync(1..=10, 1..=5); - assert_sync( - (1..=10).filter(|ix| ix % 2 == 0), - (1..=10).filter(|ix| ix % 2 == 1), - ); - assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); - assert_sync(1..=10, 5..=14); - assert_sync(1..=10000, 1..=7000); + // assert_sync([1, 3, 5, 7, 9], [2, 4, 6, 8, 10]); + // assert_sync([1, 2, 3, 4, 6, 7, 8, 9, 11, 12], [4, 5, 6, 10, 12]); + // assert_sync(1..=10, 5..=14); + // assert_sync(1..=10000, 1..=7000); } fn assert_sync( @@ -348,41 +414,37 @@ mod tests { } #[test] - fn test_range_digests() { + fn test_request_digests() { let operations = btree::Sequence::from_iter((1..=64).map(build_operation), &()); - assert_eq!(ranges(&range_digests(&operations, 0..64, 2, 0,)), [0..64]); assert_eq!( - ranges(&range_digests(&operations, 0..64, 2, 1)), - [0..32, 32..64] + digest_counts(&request_digests(&operations, 0..64, 2, 0,)), + [64] ); assert_eq!( - ranges(&range_digests(&operations, 0..64, 2, 2)), - [0..16, 16..32, 32..48, 48..64] + digest_counts(&request_digests(&operations, 0..64, 2, 1)), + [32, 32] ); assert_eq!( - ranges(&range_digests(&operations, 32..48, 2, 2)), - [32..36, 36..40, 40..44, 44..48] + digest_counts(&request_digests(&operations, 0..64, 2, 2)), + [16, 16, 16, 16] + ); + assert_eq!( + digest_counts(&request_digests(&operations, 32..48, 2, 2)), + [4, 4, 4, 4] ); - assert_eq!(ranges(&range_digests(&operations, 0..64, 3, 0)), [0..64]); assert_eq!( - ranges(&range_digests(&operations, 0..64, 3, 1)), - [0..22, 22..44, 44..64] + digest_counts(&request_digests(&operations, 0..64, 3, 0)), + [64] ); assert_eq!( - ranges(&range_digests(&operations, 0..64, 3, 2)), - [ - 0..8, - 8..16, - 16..22, - 22..30, - 30..38, - 38..44, - 44..51, - 51..58, - 58..64 - ] + digest_counts(&request_digests(&operations, 0..64, 3, 1)), + [22, 22, 22] + ); + assert_eq!( + digest_counts(&request_digests(&operations, 0..64, 3, 2)), + [8, 8, 8, 8, 8, 8, 8, 8] ); } @@ -397,7 +459,7 @@ mod tests { }) } - fn ranges(digests: &[RangeDigest]) -> Vec> { - digests.iter().map(|d| d.range.clone()).collect() + fn digest_counts(digests: &[Digest]) -> Vec { + digests.iter().map(|d| d.count).collect() } }