Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Sloan
aac524858e Remove util::extend_sorted and its remaining use in fs watching 2024-12-31 15:32:23 -07:00
4 changed files with 65 additions and 71 deletions

View File

@@ -10,6 +10,8 @@ use git::GitHostingProviderRegistry;
#[cfg(any(target_os = "linux", target_os = "freebsd"))] #[cfg(any(target_os = "linux", target_os = "freebsd"))]
use ashpd::desktop::trash; use ashpd::desktop::trash;
#[cfg(any(target_os = "linux", target_os = "freebsd"))] #[cfg(any(target_os = "linux", target_os = "freebsd"))]
use collections::BTreeSet;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
use smol::process::Command; use smol::process::Command;
#[cfg(any(target_os = "linux", target_os = "freebsd"))] #[cfg(any(target_os = "linux", target_os = "freebsd"))]
use std::fs::File; use std::fs::File;
@@ -120,6 +122,17 @@ pub trait Fs: Send + Sync {
path: &Path, path: &Path,
) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>; ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
async fn watch(
&self,
path: &Path,
latency: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = BTreeSet<PathEvent>>>>,
Arc<dyn Watcher>,
);
#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
async fn watch( async fn watch(
&self, &self,
path: &Path, path: &Path,
@@ -701,13 +714,14 @@ impl Fs for RealFs {
path: &Path, path: &Path,
latency: Duration, latency: Duration,
) -> ( ) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathEvent>>>>, Pin<Box<dyn Send + Stream<Item = BTreeSet<PathEvent>>>>,
Arc<dyn Watcher>, Arc<dyn Watcher>,
) { ) {
use collections::BTreeSet;
use parking_lot::Mutex; use parking_lot::Mutex;
let (tx, rx) = smol::channel::unbounded(); let (tx, rx) = smol::channel::unbounded();
let pending_paths: Arc<Mutex<Vec<PathEvent>>> = Default::default(); let pending_paths: Arc<Mutex<BTreeSet<PathEvent>>> = Default::default();
let watcher = Arc::new(linux_watcher::LinuxWatcher::new(tx, pending_paths.clone())); let watcher = Arc::new(linux_watcher::LinuxWatcher::new(tx, pending_paths.clone()));
if watcher.add(path).is_err() { if watcher.add(path).is_err() {
@@ -1938,6 +1952,7 @@ impl Fs for FakeFs {
Ok(Box::pin(futures::stream::iter(paths))) Ok(Box::pin(futures::stream::iter(paths)))
} }
#[cfg(not(any(target_os = "linux", target_os = "freebsd")))]
async fn watch( async fn watch(
&self, &self,
path: &Path, path: &Path,
@@ -1966,6 +1981,38 @@ impl Fs for FakeFs {
) )
} }
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
async fn watch(
&self,
path: &Path,
_: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = BTreeSet<PathEvent>>>>,
Arc<dyn Watcher>,
) {
self.simulate_random_delay().await;
let (tx, rx) = smol::channel::unbounded();
self.state.lock().event_txs.push(tx);
let path = path.to_path_buf();
let executor = self.executor.clone();
(
Box::pin(
futures::StreamExt::filter(rx, move |events| {
let result = events
.iter()
.any(|evt_path| evt_path.path.starts_with(&path));
let executor = executor.clone();
async move {
executor.simulate_random_delay().await;
result
}
})
.map(|events| BTreeSet::from_iter(events.into_iter())),
),
Arc::new(FakeWatcher {}),
)
}
fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<dyn GitRepository>> { fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<dyn GitRepository>> {
let state = self.state.lock(); let state = self.state.lock();
let entry = state.read_path(abs_dot_git).unwrap(); let entry = state.read_path(abs_dot_git).unwrap();

View File

@@ -1,3 +1,4 @@
use collections::BTreeSet;
use notify::EventKind; use notify::EventKind;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
@@ -7,13 +8,13 @@ use crate::{PathEvent, PathEventKind, Watcher};
pub struct LinuxWatcher { pub struct LinuxWatcher {
tx: smol::channel::Sender<()>, tx: smol::channel::Sender<()>,
pending_path_events: Arc<Mutex<Vec<PathEvent>>>, pending_path_events: Arc<Mutex<BTreeSet<PathEvent>>>,
} }
impl LinuxWatcher { impl LinuxWatcher {
pub fn new( pub fn new(
tx: smol::channel::Sender<()>, tx: smol::channel::Sender<()>,
pending_path_events: Arc<Mutex<Vec<PathEvent>>>, pending_path_events: Arc<Mutex<BTreeSet<PathEvent>>>,
) -> Self { ) -> Self {
Self { Self {
tx, tx,
@@ -40,7 +41,7 @@ impl Watcher for LinuxWatcher {
EventKind::Remove(_) => Some(PathEventKind::Removed), EventKind::Remove(_) => Some(PathEventKind::Removed),
_ => None, _ => None,
}; };
let mut path_events = event let path_events = event
.paths .paths
.iter() .iter()
.filter_map(|event_path| { .filter_map(|event_path| {
@@ -52,17 +53,12 @@ impl Watcher for LinuxWatcher {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if !path_events.is_empty() { if !path_events.is_empty() {
path_events.sort();
let mut pending_paths = pending_paths.lock(); let mut pending_paths = pending_paths.lock();
if pending_paths.is_empty() { let was_empty = pending_paths.is_empty();
pending_paths.extend(path_events);
if was_empty {
tx.try_send(()).ok(); tx.try_send(()).ok();
} }
util::extend_sorted(
&mut *pending_paths,
path_events,
usize::MAX,
|a, b| a.path.cmp(&b.path),
);
} }
}) })
} }

View File

@@ -87,29 +87,6 @@ pub fn post_inc<T: From<u8> + AddAssign<T> + Copy>(value: &mut T) -> T {
prev prev
} }
/// Extend a sorted vector with a sorted sequence of items, maintaining the vector's sort order and
/// enforcing a maximum length. This also de-duplicates items. Sort the items according to the given callback. Before calling this,
/// both `vec` and `new_items` should already be sorted according to the `cmp` comparator.
pub fn extend_sorted<T, I, F>(vec: &mut Vec<T>, new_items: I, limit: usize, mut cmp: F)
where
I: IntoIterator<Item = T>,
F: FnMut(&T, &T) -> Ordering,
{
let mut start_index = 0;
for new_item in new_items {
if let Err(i) = vec[start_index..].binary_search_by(|m| cmp(m, &new_item)) {
let index = start_index + i;
if vec.len() < limit {
vec.insert(index, new_item);
} else if index < vec.len() {
vec.pop();
vec.insert(index, new_item);
}
start_index = index;
}
}
}
pub fn truncate_to_bottom_n_sorted_by<T, F>(items: &mut Vec<T>, limit: usize, compare: &F) pub fn truncate_to_bottom_n_sorted_by<T, F>(items: &mut Vec<T>, limit: usize, compare: &F)
where where
F: Fn(&T, &T) -> Ordering, F: Fn(&T, &T) -> Ordering,
@@ -740,20 +717,6 @@ pub fn word_consists_of_emojis(s: &str) -> bool {
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn test_extend_sorted() {
let mut vec = vec![];
extend_sorted(&mut vec, vec![21, 17, 13, 8, 1, 0], 5, |a, b| b.cmp(a));
assert_eq!(vec, &[21, 17, 13, 8, 1]);
extend_sorted(&mut vec, vec![101, 19, 17, 8, 2], 8, |a, b| b.cmp(a));
assert_eq!(vec, &[101, 21, 19, 17, 13, 8, 2, 1]);
extend_sorted(&mut vec, vec![1000, 19, 17, 9, 5], 8, |a, b| b.cmp(a));
assert_eq!(vec, &[1000, 101, 21, 19, 17, 13, 9, 8]);
}
#[test] #[test]
fn test_iife() { fn test_iife() {
fn option_returning_function() -> Option<()> { fn option_returning_function() -> Option<()> {

View File

@@ -6,7 +6,7 @@ mod worktree_tests;
use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
use anyhow::{anyhow, Context as _, Result}; use anyhow::{anyhow, Context as _, Result};
use clock::ReplicaId; use clock::ReplicaId;
use collections::{HashMap, HashSet, VecDeque}; use collections::{BTreeSet, HashMap, HashSet, VecDeque};
use fs::{copy_recursive, Fs, MTime, PathEvent, RemoveOptions, Watcher}; use fs::{copy_recursive, Fs, MTime, PathEvent, RemoveOptions, Watcher};
use futures::{ use futures::{
channel::{ channel::{
@@ -310,7 +310,7 @@ struct BackgroundScannerState {
/// if the same inode is discovered at a new path, or if the given /// if the same inode is discovered at a new path, or if the given
/// path is re-created after being deleted. /// path is re-created after being deleted.
removed_entries: HashMap<u64, Entry>, removed_entries: HashMap<u64, Entry>,
changed_paths: Vec<Arc<Path>>, changed_paths: BTreeSet<Arc<Path>>,
prev_snapshot: Snapshot, prev_snapshot: Snapshot,
git_hosting_provider_registry: Option<Arc<GitHostingProviderRegistry>>, git_hosting_provider_registry: Option<Arc<GitHostingProviderRegistry>>,
} }
@@ -2990,9 +2990,7 @@ impl BackgroundScannerState {
.edit(entries_by_path_edits, &()); .edit(entries_by_path_edits, &());
self.snapshot.entries_by_id.edit(entries_by_id_edits, &()); self.snapshot.entries_by_id.edit(entries_by_id_edits, &());
if let Err(ix) = self.changed_paths.binary_search(parent_path) { self.changed_paths.insert(parent_path.clone());
self.changed_paths.insert(ix, parent_path.clone());
}
#[cfg(test)] #[cfg(test)]
self.snapshot.check_invariants(false); self.snapshot.check_invariants(false);
@@ -4562,12 +4560,7 @@ impl BackgroundScanner {
} }
} }
util::extend_sorted( state.changed_paths.extend(relative_paths.iter().cloned());
&mut state.changed_paths,
relative_paths.iter().cloned(),
usize::MAX,
Ord::cmp,
);
} }
fn remove_repo_path(&self, path: &Path, snapshot: &mut LocalSnapshot) -> Option<()> { fn remove_repo_path(&self, path: &Path, snapshot: &mut LocalSnapshot) -> Option<()> {
@@ -4740,9 +4733,7 @@ impl BackgroundScanner {
let state = &mut self.state.lock(); let state = &mut self.state.lock();
for edit in &entries_by_path_edits { for edit in &entries_by_path_edits {
if let Edit::Insert(entry) = edit { if let Edit::Insert(entry) = edit {
if let Err(ix) = state.changed_paths.binary_search(&entry.path) { state.changed_paths.insert(entry.path.clone());
state.changed_paths.insert(ix, entry.path.clone());
}
} }
} }
@@ -4930,12 +4921,9 @@ impl BackgroundScanner {
.collect(); .collect();
// Apply the git status changes. // Apply the git status changes.
util::extend_sorted( state
&mut state.changed_paths, .changed_paths
changes.iter().map(|p| p.0.clone()), .extend(changes.iter().map(|p| p.0.clone()));
usize::MAX,
Ord::cmp,
);
state.snapshot.entries_by_path.edit(edits, &()); state.snapshot.entries_by_path.edit(edits, &());
log::trace!( log::trace!(
"applied git status updates for repo {:?} in {:?}", "applied git status updates for repo {:?} in {:?}",
@@ -4948,7 +4936,7 @@ impl BackgroundScanner {
&self, &self,
old_snapshot: &Snapshot, old_snapshot: &Snapshot,
new_snapshot: &Snapshot, new_snapshot: &Snapshot,
event_paths: &[Arc<Path>], event_paths: &BTreeSet<Arc<Path>>,
) -> UpdatedEntriesSet { ) -> UpdatedEntriesSet {
use BackgroundScannerPhase::*; use BackgroundScannerPhase::*;
use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};