Files
zed/crates/text/src/network.rs
Nathan Sobo 1ae326432e Extract a scheduler crate from GPUI to enable unified integration testing of client and server code (#37326)
Extracts and cleans up GPUI's scheduler code into a new `scheduler`
crate, making it pluggable by external runtimes. This will enable
deterministic integration testing with cloud components by providing a
unified test scheduler across Zed and backend code. In Zed, it will
replace the existing GPUI scheduler for consistent async task management
across platforms.

## Changes

- **Core Implementation**: `TestScheduler` with seed-based
randomization, session tracking (`SessionId`), and foreground/background
task separation for reproducible testing.
- **Executors**: `ForegroundExecutor` (!Send, thread-local) and
`BackgroundExecutor` (Send, with blocking/timeout support) as
GPUI-compatible wrappers.
- **Clock and Timer**: Controllable `TestClock` and future-based `Timer`
for time-sensitive tests.
- **Testing APIs**: `once()`, `with_seed()`, and `many()` methods for
configurable test runs.
- **Dependencies**: Added `async-task`, `chrono`, `futures`, etc., with
updates to `Cargo.toml` and lock file.

## Benefits

- **Integration Testing**: Facilitates reliable async tests involving
cloud sessions, reducing flakiness via deterministic execution.
- **Pluggability**: Trait-based design (`Scheduler`) allows easy
integration into non-GPUI runtimes while maintaining GPUI compatibility.
- **Cleanup**: Refactors GPUI scheduler logic for clarity, correctness
(no `unwrap()`, proper error handling), and extensibility.

Follows Rust guidelines; run `./script/clippy` for verification.

- [x] Define and test a core scheduler that we think can power our cloud
code and GPUI
- [ ] Replace GPUI's scheduler


Release Notes:

- N/A

---------

Co-authored-by: Antonio Scandurra <me@as-cii.com>
2025-09-04 17:14:53 +02:00

95 lines
2.9 KiB
Rust

use std::fmt::Debug;
use clock::ReplicaId;
use collections::{BTreeMap, HashSet};
pub struct Network<T: Clone, R: rand::Rng> {
inboxes: BTreeMap<ReplicaId, Vec<Envelope<T>>>,
disconnected_peers: HashSet<ReplicaId>,
rng: R,
}
#[derive(Clone, Debug)]
struct Envelope<T: Clone> {
message: T,
}
impl<T: Clone, R: rand::Rng> Network<T, R> {
pub fn new(rng: R) -> Self {
Network {
inboxes: BTreeMap::default(),
disconnected_peers: HashSet::default(),
rng,
}
}
pub fn add_peer(&mut self, id: ReplicaId) {
self.inboxes.insert(id, Vec::new());
}
pub fn disconnect_peer(&mut self, id: ReplicaId) {
self.disconnected_peers.insert(id);
self.inboxes.get_mut(&id).unwrap().clear();
}
pub fn reconnect_peer(&mut self, id: ReplicaId, replicate_from: ReplicaId) {
assert!(self.disconnected_peers.remove(&id));
self.replicate(replicate_from, id);
}
pub fn is_disconnected(&self, id: ReplicaId) -> bool {
self.disconnected_peers.contains(&id)
}
pub fn contains_disconnected_peers(&self) -> bool {
!self.disconnected_peers.is_empty()
}
pub fn replicate(&mut self, old_replica_id: ReplicaId, new_replica_id: ReplicaId) {
self.inboxes
.insert(new_replica_id, self.inboxes[&old_replica_id].clone());
}
pub fn is_idle(&self) -> bool {
self.inboxes.values().all(|i| i.is_empty())
}
pub fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
// Drop messages from disconnected peers.
if self.disconnected_peers.contains(&sender) {
return;
}
for (replica, inbox) in self.inboxes.iter_mut() {
if *replica != sender && !self.disconnected_peers.contains(replica) {
for message in &messages {
// Insert one or more duplicates of this message, potentially *before* the previous
// message sent by this peer to simulate out-of-order delivery.
for _ in 0..self.rng.random_range(1..4) {
let insertion_index = self.rng.random_range(0..inbox.len() + 1);
inbox.insert(
insertion_index,
Envelope {
message: message.clone(),
},
);
}
}
}
}
}
pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
!self.inboxes[&receiver].is_empty()
}
pub fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
let inbox = self.inboxes.get_mut(&receiver).unwrap();
let count = self.rng.random_range(0..inbox.len() + 1);
inbox
.drain(0..count)
.map(|envelope| envelope.message)
.collect()
}
}