Begin more refactoring

This commit is contained in:
Conrad Irwin
2025-04-18 16:41:23 -06:00
parent ecae316e93
commit 59f99b062c
4 changed files with 295 additions and 199 deletions

View File

@@ -302,33 +302,6 @@ impl DebugPanel {
self.active_session.clone()
}
pub fn debug_panel_items_by_client(
&self,
client_id: &SessionId,
cx: &Context<Self>,
) -> Vec<Entity<DebugSession>> {
self.sessions
.iter()
.filter(|item| item.read(cx).session_id(cx) == *client_id)
.map(|item| item.clone())
.collect()
}
pub fn debug_panel_item_by_client(
&self,
client_id: SessionId,
cx: &mut Context<Self>,
) -> Option<Entity<DebugSession>> {
self.sessions
.iter()
.find(|item| {
let item = item.read(cx);
item.session_id(cx) == client_id
})
.cloned()
}
fn handle_dap_store_event(
&mut self,
dap_store: &Entity<DapStore>,

View File

@@ -31,6 +31,7 @@ use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
use lsp::LanguageServerName;
use node_runtime::NodeRuntime;
use remote::SshRemoteClient;
use rpc::{
AnyProtoClient, TypedEnvelope,
proto::{self},
@@ -83,19 +84,20 @@ pub struct LocalDapStore {
http_client: Arc<dyn HttpClient>,
environment: Entity<ProjectEnvironment>,
language_registry: Arc<LanguageRegistry>,
worktree_store: Entity<WorktreeStore>,
toolchain_store: Arc<dyn LanguageToolchainStore>,
locators: HashMap<String, Arc<dyn DapLocator>>,
}
pub struct SshDapStore {
upstream_client: AnyProtoClient,
ssh_client: Entity<SshRemoteClient>,
upstream_project_id: u64,
}
pub struct DapStore {
mode: DapStoreMode,
downstream_client: Option<(AnyProtoClient, u64)>,
worktree_store: Entity<WorktreeStore>,
breakpoint_store: Entity<BreakpointStore>,
sessions: BTreeMap<SessionId, Entity<Session>>,
next_session_id: u32,
@@ -136,39 +138,43 @@ impl DapStore {
http_client,
node_runtime,
toolchain_store,
worktree_store,
language_registry,
locators,
});
Self::new(mode, breakpoint_store, cx)
Self::new(mode, worktree_store, breakpoint_store, cx)
}
pub fn new_ssh(
project_id: u64,
upstream_client: AnyProtoClient,
ssh_client: Entity<SshRemoteClient>,
worktree_store: Entity<WorktreeStore>,
breakpoint_store: Entity<BreakpointStore>,
cx: &mut Context<Self>,
) -> Self {
let upstream_client = ssh_client.read(cx).proto_client();
let mode = DapStoreMode::Ssh(SshDapStore {
upstream_client,
ssh_client,
upstream_project_id: project_id,
});
Self::new(mode, breakpoint_store, cx)
Self::new(mode, worktree_store, breakpoint_store, cx)
}
pub fn new_collab(
_project_id: u64,
_upstream_client: AnyProtoClient,
worktree_store: Entity<WorktreeStore>,
breakpoint_store: Entity<BreakpointStore>,
cx: &mut Context<Self>,
) -> Self {
Self::new(DapStoreMode::Collab, breakpoint_store, cx)
Self::new(DapStoreMode::Collab, worktree_store, breakpoint_store, cx)
}
fn new(
mode: DapStoreMode,
worktree_store: Entity<WorktreeStore>,
breakpoint_store: Entity<BreakpointStore>,
cx: &mut Context<Self>,
) -> Self {
@@ -201,6 +207,7 @@ impl DapStore {
start_debugging_tx,
next_session_id: 0,
downstream_client: None,
worktree_store,
breakpoint_store,
sessions: Default::default(),
}
@@ -211,12 +218,11 @@ impl DapStore {
definition: DebugTaskDefinition,
cx: &mut Context<Self>,
) -> Task<Result<DebugAdapterBinary>> {
let Some(worktree) = self.worktree_store.read(cx).visible_worktrees(cx).next() else {
return Task::ready(Err(anyhow!("Failed to find a worktree")));
};
match &self.mode {
DapStoreMode::Local(local) => {
let Some(worktree) = local.worktree_store.read(cx).visible_worktrees(cx).next()
else {
return Task::ready(Err(anyhow!("Failed to find a worktree")));
};
let Some(adapter) = DapRegistry::global(cx).adapter(&definition.adapter) else {
return Task::ready(Err(anyhow!("Failed to find a debug adapter")));
};
@@ -261,10 +267,50 @@ impl DapStore {
project_id: ssh.upstream_project_id,
task: Some(definition.to_proto()),
});
let ssh_client = ssh.ssh_client.clone();
cx.background_spawn(async move {
let response = request.await?;
DebugAdapterBinary::from_proto(response)
let binary = DebugAdapterBinary::from_proto(response)?;
let mut ssh_command = ssh_client.update(cx, |ssh, _| {
anyhow::Ok(SshCommand {
arguments: ssh
.ssh_args()
.ok_or_else(|| anyhow!("SSH arguments not found"))?,
})
})??;
let mut connection = None;
if let Some(c) = binary.connection {
let local_bind_addr = Ipv4Addr::new(127, 0, 0, 1);
let port =
dap::transport::TcpTransport::unused_port(local_bind_addr).await?;
ssh_command.add_port_forwarding(port, c.host.to_string(), c.port);
connection = Some(TcpArguments {
port: c.port,
host: local_bind_addr,
timeout: c.timeout,
})
}
let (program, args) = wrap_for_ssh(
&ssh_command,
Some((&binary.command, &binary.arguments)),
binary.cwd.as_deref(),
binary.envs,
None,
);
Ok(DebugAdapterBinary {
command: program,
arguments: args,
envs: HashMap::default(),
cwd: None,
connection,
request_args: binary.request_args.into(),
})
})
}
DapStoreMode::Collab => {
@@ -316,6 +362,133 @@ impl DapStore {
}
}
pub fn start_session(
&mut self,
template: DebugTaskDefinition,
parent_session: Option<Entity<Session>>,
cx: &mut Context<Self>,
) -> Result<Entity<Session>> {
let Some(worktree) = self.worktree_store.read(cx).visible_worktrees(cx).next() else {
return Err(anyhow!("Failed to find a worktree"));
};
let session_id = SessionId(util::post_inc(&mut self.next_session_id));
if let Some(session) = &parent_session {
session.update(cx, |session, _| {
session.add_child_session_id(session_id);
});
}
let start_debugging_tx = self.start_debugging_tx.clone();
let (initialized_tx, initialized_rx) = oneshot::channel();
let session = Session::new(
self.breakpoint_store.clone(),
cx.entity().downgrade(),
worktree.downgrade(),
session_id,
parent_session,
template.clone(),
start_debugging_tx,
initialized_tx,
cx,
);
self.sessions.insert(session_id, session.clone());
cx.emit(DapStoreEvent::DebugClientStarted(session_id));
cx.notify();
cx.subscribe(
&session,
move |this: &mut DapStore, session, event: &SessionStateEvent, cx| match event {
SessionStateEvent::Shutdown => {
this.shutdown_session(session_id, cx).detach_and_log_err(cx);
}
SessionStateEvent::Restart => {
let Some((config, binary)) = session.read_with(cx, |session, _| {
session
.configuration()
.map(|config| (config, session.binary().clone()))
}) else {
log::error!("Failed to get debug config from session");
return;
};
let mut curr_session = session;
while let Some(parent_id) = curr_session.read(cx).parent_id() {
if let Some(parent_session) = this.sessions.get(&parent_id).cloned() {
curr_session = parent_session;
} else {
log::error!("Failed to get parent session from parent session id");
break;
}
}
let session_id = curr_session.read(cx).session_id();
let task = curr_session.update(cx, |session, cx| session.shutdown(cx));
let worktree = worktree.clone();
cx.spawn(async move |this, cx| {
task.await;
this.update(cx, |this, cx| {
this.sessions.remove(&session_id);
this.new_session(binary, config, worktree.downgrade(), None, cx)
})?
.1
.await?;
anyhow::Ok(())
})
.detach_and_log_err(cx);
}
},
)
.detach();
Self::boot_session(template, session.clone(), cx);
Ok(session)
}
fn boot_session(
definition: DebugTaskDefinition,
session: Entity<Session>,
cx: &mut Context<Self>,
) {
let session_id = session.read(cx).session_id();
let result = cx.spawn(async move |this, cx| {
let mut binary = this
.update(cx, |this, cx| {
this.get_debug_adapter_binary(definition.clone(), cx)
})?
.await?;
session
.update(cx, |session, cx| session.boot(binary, cx))?
.await
});
cx.spawn(async move |this, cx| match result.await {
Ok(_) => {
this.update(cx, |this, cx| {
cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
})
.ok();
}
Err(error) => {
this.update(cx, |this, cx| {
cx.emit(DapStoreEvent::Notification(error.to_string()));
})
.ok();
session
.update(cx, |session, cx| cx.emit(SessionStateEvent::Shutdown))
.ok();
}
});
}
pub fn add_remote_client(
&mut self,
session_id: SessionId,

View File

@@ -41,7 +41,7 @@ use std::{
path::Path,
sync::Arc,
};
use task::DebugTaskDefinition;
use task::{DebugTaskDefinition, DebugTaskTemplate};
use text::{PointUtf16, ToPointUtf16};
use util::{ResultExt, merge_json_value_into};
use worktree::Worktree;
@@ -117,46 +117,9 @@ impl From<dap::Thread> for Thread {
type UpstreamProjectId = u64;
struct RemoteConnection {
_client: AnyProtoClient,
_upstream_project_id: UpstreamProjectId,
_adapter_name: SharedString,
}
impl RemoteConnection {
fn send_proto_client_request<R: DapCommand>(
&self,
_request: R,
_session_id: SessionId,
cx: &mut App,
) -> Task<Result<R::Response>> {
// let message = request.to_proto(session_id, self.upstream_project_id);
// let upstream_client = self.client.clone();
cx.background_executor().spawn(async move {
// debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
// let response = upstream_client.request(message).await?;
// request.response_from_proto(response)
Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
})
}
fn request<R: DapCommand>(
&self,
request: R,
session_id: SessionId,
cx: &mut App,
) -> Task<Result<R::Response>>
where
<R::DapRequest as dap::requests::Request>::Response: 'static,
<R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
{
return self.send_proto_client_request::<R>(request, session_id, cx);
}
}
enum Mode {
Local(LocalMode),
Remote(RemoteConnection),
Building,
Running(LocalMode),
}
#[derive(Clone)]
@@ -194,30 +157,6 @@ impl LocalMode {
binary: DebugAdapterBinary,
messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
cx: AsyncApp,
) -> Task<Result<Self>> {
Self::new_inner(
session_id,
parent_session,
breakpoint_store,
worktree,
config,
binary,
messages_tx,
async |_, _| {},
cx,
)
}
fn new_inner(
session_id: SessionId,
parent_session: Option<Entity<Session>>,
breakpoint_store: Entity<BreakpointStore>,
worktree: WeakEntity<Worktree>,
config: DebugTaskDefinition,
binary: DebugAdapterBinary,
messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
cx: AsyncApp,
) -> Task<Result<Self>> {
cx.spawn(async move |cx| {
let message_handler = Box::new(move |message| {
@@ -244,18 +183,14 @@ impl LocalMode {
},
);
let mut session = Self {
Ok(Self {
client,
breakpoint_store,
worktree,
tmp_breakpoint: None,
definition: config,
binary,
};
on_initialized(&mut session, cx.clone()).await;
Ok(session)
})
})
}
@@ -533,7 +468,7 @@ impl LocalMode {
}
impl From<RemoteConnection> for Mode {
fn from(value: RemoteConnection) -> Self {
Self::Remote(value)
Self::Building(value)
}
}
@@ -549,10 +484,10 @@ impl Mode {
<R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
{
match self {
Mode::Local(debug_adapter_client) => {
Mode::Running(debug_adapter_client) => {
debug_adapter_client.request(request, cx.background_executor().clone())
}
Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
Mode::Building(remote_connection) => remote_connection.request(request, session_id, cx),
}
}
}
@@ -632,7 +567,7 @@ pub struct Session {
pub(super) capabilities: Capabilities,
id: SessionId,
child_session_ids: HashSet<SessionId>,
parent_id: Option<SessionId>,
parent_session: Option<SessionId>,
ignore_breakpoints: bool,
modules: Vec<dap::Module>,
loaded_sources: Vec<dap::Source>,
@@ -646,6 +581,8 @@ pub struct Session {
is_session_terminated: bool,
requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
initialized_tx: oneshot::Sender<()>,
_background_tasks: Vec<Task<()>>,
}
@@ -742,78 +679,96 @@ impl EventEmitter<SessionStateEvent> for Session {}
// remote side will only send breakpoint updates when it is a breakpoint created by that peer
// BreakpointStore notifies session on breakpoint changes
impl Session {
pub(crate) fn local(
pub(crate) fn new(
breakpoint_store: Entity<BreakpointStore>,
dap_store: WeakEntity<DapStore>,
worktree: WeakEntity<Worktree>,
session_id: SessionId,
parent_session: Option<Entity<Session>>,
binary: DebugAdapterBinary,
config: DebugTaskDefinition,
template: DebugTaskTemplate,
start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
initialized_tx: oneshot::Sender<()>,
cx: &mut App,
) -> Task<Result<Entity<Self>>> {
let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
) -> Entity<Self> {
cx.new(|cx| {
let this = Self {
mode: Mode::Building,
id: session_id,
child_session_ids: HashSet::default(),
parent_session: parent_session
capabilities: Capabilities::default(),
ignore_breakpoints: false,
variables: Default::default(),
stack_frames: Default::default(),
thread_states: ThreadStates::default(),
output_token: OutputToken(0),
output: circular_buffer::CircularBuffer::boxed(),
requests: HashMap::default(),
modules: Vec::default(),
loaded_sources: Vec::default(),
threads: IndexMap::default(),
_background_tasks: Vec::default(),
locations: Default::default(),
is_session_terminated: false,
exception_breakpoints: Default::default(),
start_debugging_requests_tx,
initialized_tx,
};
cx.spawn(async move |cx| {
let mode = LocalMode::new(
session_id,
parent_session.clone(),
worktree,
breakpoint_store.clone(),
config.clone(),
binary,
message_tx,
cx.clone(),
)
.await?;
cx.new(|cx| {
create_local_session(
breakpoint_store,
session_id,
parent_session,
start_debugging_requests_tx,
initialized_tx,
message_rx,
mode,
cx,
)
})
this
})
}
pub(crate) fn remote(
session_id: SessionId,
client: AnyProtoClient,
upstream_project_id: u64,
ignore_breakpoints: bool,
) -> Self {
Self {
mode: Mode::Remote(RemoteConnection {
_adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
_client: client,
_upstream_project_id: upstream_project_id,
}),
id: session_id,
child_session_ids: HashSet::default(),
parent_id: None,
capabilities: Capabilities::default(),
ignore_breakpoints,
variables: Default::default(),
stack_frames: Default::default(),
thread_states: ThreadStates::default(),
output_token: OutputToken(0),
output: circular_buffer::CircularBuffer::boxed(),
requests: HashMap::default(),
modules: Vec::default(),
loaded_sources: Vec::default(),
threads: IndexMap::default(),
_background_tasks: Vec::default(),
locations: Default::default(),
is_session_terminated: false,
exception_breakpoints: Default::default(),
}
pub fn boot(&mut self, binary: DebugAdapterBinary, cx: &mut Context<Self>) -> Task<Result<()>> {
let mode = LocalMode::new(
self.id,
self.parent_session.clone(),
parent_session.clone(),
worktree,
breakpoint_store.clone(),
config.clone(),
binary,
message_tx,
cx.clone(),
)
.await?;
create_local_session(
breakpoint_store,
session_id,
parent_session,
start_debugging_requests_tx,
initialized_tx,
message_rx,
mode,
cx,
);
self.mode = Mode::Running(())
// let seq_result = async || {
// session
// .update(cx, |session, cx| session.request_initialize(cx))?
// .await?;
// session
// .update(cx, |session, cx| {
// session.initialize_sequence(initialized_rx, this.clone(), cx)
// })?
// .await
// };
// match seq_result().await {
// Ok(_) => {}
// Err(error) => {
// this.update(cx, |this, cx| {
// cx.emit(DapStoreEvent::Notification(error.to_string()));
// this.shutdown_session(session_id, cx)
// })?
// .await
// .log_err();
// return Err(error);
// }
// }
}
pub fn session_id(&self) -> SessionId {
@@ -833,7 +788,7 @@ impl Session {
}
pub fn parent_id(&self) -> Option<SessionId> {
self.parent_id
self.parent_session
}
pub fn capabilities(&self) -> &Capabilities {
@@ -841,7 +796,7 @@ impl Session {
}
pub fn binary(&self) -> &DebugAdapterBinary {
let Mode::Local(local_mode) = &self.mode else {
let Mode::Running(local_mode) = &self.mode else {
panic!("Session is not local");
};
&local_mode.binary
@@ -849,13 +804,13 @@ impl Session {
pub fn adapter_name(&self) -> SharedString {
match &self.mode {
Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
Mode::Running(local_mode) => local_mode.definition.adapter.clone().into(),
Mode::Building(remote_mode) => remote_mode._adapter_name.clone(),
}
}
pub fn configuration(&self) -> Option<DebugTaskDefinition> {
if let Mode::Local(local_mode) = &self.mode {
if let Mode::Running(local_mode) = &self.mode {
Some(local_mode.definition.clone())
} else {
None
@@ -867,26 +822,26 @@ impl Session {
}
pub fn is_local(&self) -> bool {
matches!(self.mode, Mode::Local(_))
matches!(self.mode, Mode::Running(_))
}
pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
match &mut self.mode {
Mode::Local(local_mode) => Some(local_mode),
Mode::Remote(_) => None,
Mode::Running(local_mode) => Some(local_mode),
Mode::Building(_) => None,
}
}
pub fn as_local(&self) -> Option<&LocalMode> {
match &self.mode {
Mode::Local(local_mode) => Some(local_mode),
Mode::Remote(_) => None,
Mode::Running(local_mode) => Some(local_mode),
Mode::Building(_) => None,
}
}
pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
match &self.mode {
Mode::Local(local_mode) => {
Mode::Running(local_mode) => {
let capabilities = local_mode.clone().request_initialization(cx);
cx.spawn(async move |this, cx| {
@@ -909,7 +864,7 @@ impl Session {
Ok(())
})
}
Mode::Remote(_) => Task::ready(Err(anyhow!(
Mode::Building(_) => Task::ready(Err(anyhow!(
"Cannot send initialize request from remote session"
))),
}
@@ -922,10 +877,10 @@ impl Session {
cx: &mut Context<Self>,
) -> Task<Result<()>> {
match &self.mode {
Mode::Local(local_mode) => {
Mode::Running(local_mode) => {
local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
}
Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
Mode::Building(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
}
}
@@ -936,7 +891,7 @@ impl Session {
cx: &mut Context<Self>,
) {
match &mut self.mode {
Mode::Local(local_mode) => {
Mode::Running(local_mode) => {
if !matches!(
self.thread_states.thread_state(active_thread_id),
Some(ThreadStatus::Stopped)
@@ -959,7 +914,7 @@ impl Session {
})
.detach();
}
Mode::Remote(_) => {}
Mode::Building(_) => {}
}
}
@@ -1579,8 +1534,8 @@ impl Session {
pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
match self.mode {
Mode::Local(ref local) => Some(local.client.clone()),
Mode::Remote(_) => None,
Mode::Running(ref local) => Some(local.client.clone()),
Mode::Building(_) => None,
}
}
@@ -2005,10 +1960,10 @@ fn create_local_session(
.detach();
Session {
mode: Mode::Local(mode),
mode: Mode::Running(mode),
id: session_id,
child_session_ids: HashSet::default(),
parent_id: parent_session.map(|session| session.read(cx).id),
parent_session: parent_session.map(|session| session.read(cx).id),
variables: Default::default(),
capabilities: Capabilities::default(),
thread_states: ThreadStates::default(),

View File

@@ -1070,12 +1070,7 @@ impl Project {
cx.new(|_| BreakpointStore::remote(SSH_PROJECT_ID, ssh_proto.clone().into()));
let dap_store = cx.new(|cx| {
DapStore::new_ssh(
SSH_PROJECT_ID,
ssh_proto.clone(),
breakpoint_store.clone(),
cx,
)
DapStore::new_ssh(SSH_PROJECT_ID, ssh.clone(), breakpoint_store.clone(), cx)
});
let git_store = cx.new(|cx| {