Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Sloan
3825bff2f3 Add cx.observe_async to gpui
Co-authored-by: Nathan <nathan@zed.dev>
2025-03-18 00:55:51 -06:00
2 changed files with 108 additions and 7 deletions

View File

@@ -13,9 +13,9 @@ use std::{
use anyhow::{anyhow, Result};
use derive_more::{Deref, DerefMut};
use futures::{
channel::oneshot,
channel::{mpsc, oneshot},
future::{LocalBoxFuture, Shared},
Future, FutureExt,
Future, FutureExt, StreamExt,
};
use parking_lot::RwLock;
use slotmap::SlotMap;
@@ -429,7 +429,7 @@ impl App {
result
}
/// Arrange a callback to be invoked when the given entity calls `notify` on its respective context.
/// Registers a callback to be invoked when the given entity calls [`Context::notify`].
pub fn observe<W>(
&mut self,
entity: &Entity<W>,
@@ -444,6 +444,26 @@ impl App {
})
}
/// Registers a callback to be invoked when the given entity calls [`Context::notify`]. If there
/// are `notify` calls while the async callback is running, it is run again.
pub fn observe_async<W, Fut>(
&mut self,
entity: &Entity<W>,
mut on_notify: impl FnMut(Entity<W>, &mut App) -> Fut + 'static,
) -> Subscription
where
W: 'static,
Fut: 'static + Future<Output = ()>,
{
self.observe_async_internal(entity, move |e, cx| {
let task = on_notify(e, cx);
async move {
task.await;
true
}
})
}
pub(crate) fn detect_accessed_entities<R>(
&mut self,
callback: impl FnOnce(&mut App) -> R,
@@ -513,6 +533,59 @@ impl App {
)
}
pub(crate) fn observe_async_internal<W, Fut>(
&mut self,
entity: &Entity<W>,
mut on_notify: impl FnMut(Entity<W>, &mut App) -> Fut + 'static,
) -> Subscription
where
W: 'static,
Fut: 'static + Future<Output = bool>,
{
let (mut tx, mut rx) = mpsc::channel(1);
let entity_id = entity.entity_id();
let subscription = self.new_observer(
entity_id,
Box::new(move |_cx| match tx.try_send(()) {
Ok(()) => true,
Err(err) => {
if err.is_full() {
true
} else if err.is_disconnected() {
false
} else {
log::error!("TrySendError is neither full nor disconnected: {}", err);
// Halt observing to avoid spamming the logs.
false
}
}
}),
);
let handle = entity.downgrade();
self.spawn(|cx| async move {
loop {
let Some(()) = rx.next().await else {
break;
};
let Some(entity) = Entity::<W>::upgrade_from(&handle) else {
break;
};
let Ok(future) = cx.update(|cx| on_notify(entity, cx)) else {
break;
};
let should_continue = future.await;
if !should_continue {
break;
}
}
})
.detach();
subscription
}
/// Arrange for the given callback to be invoked whenever the given entity emits an event of a given type.
/// The callback is provided a handle to the emitting entity and a reference to the emitted event.
pub fn subscribe<T, Event>(
@@ -539,6 +612,7 @@ impl App {
self.defer(move |_| activate());
subscription
}
pub(crate) fn subscribe_internal<T, Evt>(
&mut self,
entity: &Entity<T>,
@@ -1216,7 +1290,7 @@ impl App {
)
}
/// Observe the release of a entity. The callback is invoked after the entity
/// Observe the release of an entity. The callback is invoked after the entity
/// has no more strong references but before it has been dropped.
pub fn observe_release<T>(
&self,
@@ -1237,7 +1311,7 @@ impl App {
subscription
}
/// Observe the release of a entity. The callback is invoked after the entity
/// Observe the release of an entity. The callback is invoked after the entity
/// has no more strong references but before it has been dropped.
pub fn observe_release_in<T>(
&self,

View File

@@ -46,8 +46,7 @@ impl<'a, T: 'static> Context<'a, T> {
self.entity_state.clone()
}
/// Arranges for the given function to be called whenever [`Context::notify`] is
/// called with the given entity.
/// Registers a callback to be invoked when the given entity calls [`Context::notify`].
pub fn observe<W>(
&mut self,
entity: &Entity<W>,
@@ -68,6 +67,34 @@ impl<'a, T: 'static> Context<'a, T> {
})
}
/// Registers an async callback to be invoked when the given entity calls [`Context::notify`].
/// If there are `notify` calls while the async callback is running, it is run again.
pub fn observe_async<W, Fut>(
&mut self,
entity: &Entity<W>,
mut on_notify: impl FnMut(&mut T, Entity<W>, &mut Context<'_, T>) -> Fut + 'static,
) -> Subscription
where
T: 'static,
W: 'static,
Fut: 'static + Future<Output = ()>,
{
let this = self.weak_entity();
self.app.observe_async_internal(entity, move |e, cx| {
let future = this
.upgrade()
.map(|this| this.update(cx, |this, cx| on_notify(this, e, cx)));
async move {
if let Some(future) = future {
future.await;
true
} else {
false
}
}
})
}
/// Subscribe to an event type from another entity
pub fn subscribe<T2, Evt>(
&mut self,