From 59f99b062cdf5e38f91bb8904e19f4ebda9db49e Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 18 Apr 2025 16:41:23 -0600 Subject: [PATCH] Begin more refactoring --- crates/debugger_ui/src/debugger_panel.rs | 27 --- crates/project/src/debugger/dap_store.rs | 195 ++++++++++++++++- crates/project/src/debugger/session.rs | 265 ++++++++++------------- crates/project/src/project.rs | 7 +- 4 files changed, 295 insertions(+), 199 deletions(-) diff --git a/crates/debugger_ui/src/debugger_panel.rs b/crates/debugger_ui/src/debugger_panel.rs index 41ee08bd2f..03c3ad3276 100644 --- a/crates/debugger_ui/src/debugger_panel.rs +++ b/crates/debugger_ui/src/debugger_panel.rs @@ -302,33 +302,6 @@ impl DebugPanel { self.active_session.clone() } - pub fn debug_panel_items_by_client( - &self, - client_id: &SessionId, - cx: &Context, - ) -> Vec> { - self.sessions - .iter() - .filter(|item| item.read(cx).session_id(cx) == *client_id) - .map(|item| item.clone()) - .collect() - } - - pub fn debug_panel_item_by_client( - &self, - client_id: SessionId, - cx: &mut Context, - ) -> Option> { - self.sessions - .iter() - .find(|item| { - let item = item.read(cx); - - item.session_id(cx) == client_id - }) - .cloned() - } - fn handle_dap_store_event( &mut self, dap_store: &Entity, diff --git a/crates/project/src/debugger/dap_store.rs b/crates/project/src/debugger/dap_store.rs index eb3524a77b..6e29db9605 100644 --- a/crates/project/src/debugger/dap_store.rs +++ b/crates/project/src/debugger/dap_store.rs @@ -31,6 +31,7 @@ use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore}; use lsp::LanguageServerName; use node_runtime::NodeRuntime; +use remote::SshRemoteClient; use rpc::{ AnyProtoClient, TypedEnvelope, proto::{self}, @@ -83,19 +84,20 @@ pub struct LocalDapStore { http_client: Arc, environment: Entity, language_registry: Arc, - worktree_store: Entity, toolchain_store: Arc, locators: HashMap>, } pub struct SshDapStore { upstream_client: AnyProtoClient, + ssh_client: Entity, upstream_project_id: u64, } pub struct DapStore { mode: DapStoreMode, downstream_client: Option<(AnyProtoClient, u64)>, + worktree_store: Entity, breakpoint_store: Entity, sessions: BTreeMap>, next_session_id: u32, @@ -136,39 +138,43 @@ impl DapStore { http_client, node_runtime, toolchain_store, - worktree_store, language_registry, locators, }); - Self::new(mode, breakpoint_store, cx) + Self::new(mode, worktree_store, breakpoint_store, cx) } pub fn new_ssh( project_id: u64, - upstream_client: AnyProtoClient, + ssh_client: Entity, + worktree_store: Entity, breakpoint_store: Entity, cx: &mut Context, ) -> Self { + let upstream_client = ssh_client.read(cx).proto_client(); let mode = DapStoreMode::Ssh(SshDapStore { upstream_client, + ssh_client, upstream_project_id: project_id, }); - Self::new(mode, breakpoint_store, cx) + Self::new(mode, worktree_store, breakpoint_store, cx) } pub fn new_collab( _project_id: u64, _upstream_client: AnyProtoClient, + worktree_store: Entity, breakpoint_store: Entity, cx: &mut Context, ) -> Self { - Self::new(DapStoreMode::Collab, breakpoint_store, cx) + Self::new(DapStoreMode::Collab, worktree_store, breakpoint_store, cx) } fn new( mode: DapStoreMode, + worktree_store: Entity, breakpoint_store: Entity, cx: &mut Context, ) -> Self { @@ -201,6 +207,7 @@ impl DapStore { start_debugging_tx, next_session_id: 0, downstream_client: None, + worktree_store, breakpoint_store, sessions: Default::default(), } @@ -211,12 +218,11 @@ impl DapStore { definition: DebugTaskDefinition, cx: &mut Context, ) -> Task> { + let Some(worktree) = self.worktree_store.read(cx).visible_worktrees(cx).next() else { + return Task::ready(Err(anyhow!("Failed to find a worktree"))); + }; match &self.mode { DapStoreMode::Local(local) => { - let Some(worktree) = local.worktree_store.read(cx).visible_worktrees(cx).next() - else { - return Task::ready(Err(anyhow!("Failed to find a worktree"))); - }; let Some(adapter) = DapRegistry::global(cx).adapter(&definition.adapter) else { return Task::ready(Err(anyhow!("Failed to find a debug adapter"))); }; @@ -261,10 +267,50 @@ impl DapStore { project_id: ssh.upstream_project_id, task: Some(definition.to_proto()), }); + let ssh_client = ssh.ssh_client.clone(); cx.background_spawn(async move { let response = request.await?; - DebugAdapterBinary::from_proto(response) + let binary = DebugAdapterBinary::from_proto(response)?; + + let mut ssh_command = ssh_client.update(cx, |ssh, _| { + anyhow::Ok(SshCommand { + arguments: ssh + .ssh_args() + .ok_or_else(|| anyhow!("SSH arguments not found"))?, + }) + })??; + + let mut connection = None; + if let Some(c) = binary.connection { + let local_bind_addr = Ipv4Addr::new(127, 0, 0, 1); + let port = + dap::transport::TcpTransport::unused_port(local_bind_addr).await?; + + ssh_command.add_port_forwarding(port, c.host.to_string(), c.port); + connection = Some(TcpArguments { + port: c.port, + host: local_bind_addr, + timeout: c.timeout, + }) + } + + let (program, args) = wrap_for_ssh( + &ssh_command, + Some((&binary.command, &binary.arguments)), + binary.cwd.as_deref(), + binary.envs, + None, + ); + + Ok(DebugAdapterBinary { + command: program, + arguments: args, + envs: HashMap::default(), + cwd: None, + connection, + request_args: binary.request_args.into(), + }) }) } DapStoreMode::Collab => { @@ -316,6 +362,133 @@ impl DapStore { } } + pub fn start_session( + &mut self, + template: DebugTaskDefinition, + parent_session: Option>, + cx: &mut Context, + ) -> Result> { + let Some(worktree) = self.worktree_store.read(cx).visible_worktrees(cx).next() else { + return Err(anyhow!("Failed to find a worktree")); + }; + let session_id = SessionId(util::post_inc(&mut self.next_session_id)); + + if let Some(session) = &parent_session { + session.update(cx, |session, _| { + session.add_child_session_id(session_id); + }); + } + + let start_debugging_tx = self.start_debugging_tx.clone(); + + let (initialized_tx, initialized_rx) = oneshot::channel(); + + let session = Session::new( + self.breakpoint_store.clone(), + cx.entity().downgrade(), + worktree.downgrade(), + session_id, + parent_session, + template.clone(), + start_debugging_tx, + initialized_tx, + cx, + ); + + self.sessions.insert(session_id, session.clone()); + cx.emit(DapStoreEvent::DebugClientStarted(session_id)); + cx.notify(); + + cx.subscribe( + &session, + move |this: &mut DapStore, session, event: &SessionStateEvent, cx| match event { + SessionStateEvent::Shutdown => { + this.shutdown_session(session_id, cx).detach_and_log_err(cx); + } + SessionStateEvent::Restart => { + let Some((config, binary)) = session.read_with(cx, |session, _| { + session + .configuration() + .map(|config| (config, session.binary().clone())) + }) else { + log::error!("Failed to get debug config from session"); + return; + }; + + let mut curr_session = session; + while let Some(parent_id) = curr_session.read(cx).parent_id() { + if let Some(parent_session) = this.sessions.get(&parent_id).cloned() { + curr_session = parent_session; + } else { + log::error!("Failed to get parent session from parent session id"); + break; + } + } + + let session_id = curr_session.read(cx).session_id(); + + let task = curr_session.update(cx, |session, cx| session.shutdown(cx)); + + let worktree = worktree.clone(); + cx.spawn(async move |this, cx| { + task.await; + + this.update(cx, |this, cx| { + this.sessions.remove(&session_id); + this.new_session(binary, config, worktree.downgrade(), None, cx) + })? + .1 + .await?; + + anyhow::Ok(()) + }) + .detach_and_log_err(cx); + } + }, + ) + .detach(); + + Self::boot_session(template, session.clone(), cx); + Ok(session) + } + + fn boot_session( + definition: DebugTaskDefinition, + session: Entity, + cx: &mut Context, + ) { + let session_id = session.read(cx).session_id(); + + let result = cx.spawn(async move |this, cx| { + let mut binary = this + .update(cx, |this, cx| { + this.get_debug_adapter_binary(definition.clone(), cx) + })? + .await?; + + session + .update(cx, |session, cx| session.boot(binary, cx))? + .await + }); + cx.spawn(async move |this, cx| match result.await { + Ok(_) => { + this.update(cx, |this, cx| { + cx.emit(DapStoreEvent::DebugSessionInitialized(session_id)); + }) + .ok(); + } + Err(error) => { + this.update(cx, |this, cx| { + cx.emit(DapStoreEvent::Notification(error.to_string())); + }) + .ok(); + session + .update(cx, |session, cx| cx.emit(SessionStateEvent::Shutdown)) + .ok(); + } + }); + } + pub fn add_remote_client( &mut self, session_id: SessionId, diff --git a/crates/project/src/debugger/session.rs b/crates/project/src/debugger/session.rs index 0783b85c4f..4bc8de8ffb 100644 --- a/crates/project/src/debugger/session.rs +++ b/crates/project/src/debugger/session.rs @@ -41,7 +41,7 @@ use std::{ path::Path, sync::Arc, }; -use task::DebugTaskDefinition; +use task::{DebugTaskDefinition, DebugTaskTemplate}; use text::{PointUtf16, ToPointUtf16}; use util::{ResultExt, merge_json_value_into}; use worktree::Worktree; @@ -117,46 +117,9 @@ impl From for Thread { type UpstreamProjectId = u64; -struct RemoteConnection { - _client: AnyProtoClient, - _upstream_project_id: UpstreamProjectId, - _adapter_name: SharedString, -} - -impl RemoteConnection { - fn send_proto_client_request( - &self, - _request: R, - _session_id: SessionId, - cx: &mut App, - ) -> Task> { - // let message = request.to_proto(session_id, self.upstream_project_id); - // let upstream_client = self.client.clone(); - cx.background_executor().spawn(async move { - // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again - // let response = upstream_client.request(message).await?; - // request.response_from_proto(response) - Err(anyhow!("Sending dap commands over RPC isn't supported yet")) - }) - } - - fn request( - &self, - request: R, - session_id: SessionId, - cx: &mut App, - ) -> Task> - where - ::Response: 'static, - ::Arguments: 'static + Send, - { - return self.send_proto_client_request::(request, session_id, cx); - } -} - enum Mode { - Local(LocalMode), - Remote(RemoteConnection), + Building, + Running(LocalMode), } #[derive(Clone)] @@ -194,30 +157,6 @@ impl LocalMode { binary: DebugAdapterBinary, messages_tx: futures::channel::mpsc::UnboundedSender, cx: AsyncApp, - ) -> Task> { - Self::new_inner( - session_id, - parent_session, - breakpoint_store, - worktree, - config, - binary, - messages_tx, - async |_, _| {}, - cx, - ) - } - - fn new_inner( - session_id: SessionId, - parent_session: Option>, - breakpoint_store: Entity, - worktree: WeakEntity, - config: DebugTaskDefinition, - binary: DebugAdapterBinary, - messages_tx: futures::channel::mpsc::UnboundedSender, - on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static, - cx: AsyncApp, ) -> Task> { cx.spawn(async move |cx| { let message_handler = Box::new(move |message| { @@ -244,18 +183,14 @@ impl LocalMode { }, ); - let mut session = Self { + Ok(Self { client, breakpoint_store, worktree, tmp_breakpoint: None, definition: config, binary, - }; - - on_initialized(&mut session, cx.clone()).await; - - Ok(session) + }) }) } @@ -533,7 +468,7 @@ impl LocalMode { } impl From for Mode { fn from(value: RemoteConnection) -> Self { - Self::Remote(value) + Self::Building(value) } } @@ -549,10 +484,10 @@ impl Mode { ::Arguments: 'static + Send, { match self { - Mode::Local(debug_adapter_client) => { + Mode::Running(debug_adapter_client) => { debug_adapter_client.request(request, cx.background_executor().clone()) } - Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx), + Mode::Building(remote_connection) => remote_connection.request(request, session_id, cx), } } } @@ -632,7 +567,7 @@ pub struct Session { pub(super) capabilities: Capabilities, id: SessionId, child_session_ids: HashSet, - parent_id: Option, + parent_session: Option, ignore_breakpoints: bool, modules: Vec, loaded_sources: Vec, @@ -646,6 +581,8 @@ pub struct Session { is_session_terminated: bool, requests: HashMap>>>>, exception_breakpoints: BTreeMap, + start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>, + initialized_tx: oneshot::Sender<()>, _background_tasks: Vec>, } @@ -742,78 +679,96 @@ impl EventEmitter for Session {} // remote side will only send breakpoint updates when it is a breakpoint created by that peer // BreakpointStore notifies session on breakpoint changes impl Session { - pub(crate) fn local( + pub(crate) fn new( breakpoint_store: Entity, + dap_store: WeakEntity, worktree: WeakEntity, session_id: SessionId, parent_session: Option>, - binary: DebugAdapterBinary, - config: DebugTaskDefinition, + template: DebugTaskTemplate, start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>, initialized_tx: oneshot::Sender<()>, cx: &mut App, - ) -> Task>> { - let (message_tx, message_rx) = futures::channel::mpsc::unbounded(); + ) -> Entity { + cx.new(|cx| { + let this = Self { + mode: Mode::Building, + id: session_id, + child_session_ids: HashSet::default(), + parent_session: parent_session + capabilities: Capabilities::default(), + ignore_breakpoints: false, + variables: Default::default(), + stack_frames: Default::default(), + thread_states: ThreadStates::default(), + output_token: OutputToken(0), + output: circular_buffer::CircularBuffer::boxed(), + requests: HashMap::default(), + modules: Vec::default(), + loaded_sources: Vec::default(), + threads: IndexMap::default(), + _background_tasks: Vec::default(), + locations: Default::default(), + is_session_terminated: false, + exception_breakpoints: Default::default(), + start_debugging_requests_tx, + initialized_tx, + }; - cx.spawn(async move |cx| { - let mode = LocalMode::new( - session_id, - parent_session.clone(), - worktree, - breakpoint_store.clone(), - config.clone(), - binary, - message_tx, - cx.clone(), - ) - .await?; - - cx.new(|cx| { - create_local_session( - breakpoint_store, - session_id, - parent_session, - start_debugging_requests_tx, - initialized_tx, - message_rx, - mode, - cx, - ) - }) + this }) } - pub(crate) fn remote( - session_id: SessionId, - client: AnyProtoClient, - upstream_project_id: u64, - ignore_breakpoints: bool, - ) -> Self { - Self { - mode: Mode::Remote(RemoteConnection { - _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout - _client: client, - _upstream_project_id: upstream_project_id, - }), - id: session_id, - child_session_ids: HashSet::default(), - parent_id: None, - capabilities: Capabilities::default(), - ignore_breakpoints, - variables: Default::default(), - stack_frames: Default::default(), - thread_states: ThreadStates::default(), - output_token: OutputToken(0), - output: circular_buffer::CircularBuffer::boxed(), - requests: HashMap::default(), - modules: Vec::default(), - loaded_sources: Vec::default(), - threads: IndexMap::default(), - _background_tasks: Vec::default(), - locations: Default::default(), - is_session_terminated: false, - exception_breakpoints: Default::default(), - } + pub fn boot(&mut self, binary: DebugAdapterBinary, cx: &mut Context) -> Task> { + let mode = LocalMode::new( + self.id, + self.parent_session.clone(), + parent_session.clone(), + worktree, + breakpoint_store.clone(), + config.clone(), + binary, + message_tx, + cx.clone(), + ) + .await?; + + create_local_session( + breakpoint_store, + session_id, + parent_session, + start_debugging_requests_tx, + initialized_tx, + message_rx, + mode, + cx, + ); + + self.mode = Mode::Running(()) + // let seq_result = async || { + // session + // .update(cx, |session, cx| session.request_initialize(cx))? + // .await?; + + // session + // .update(cx, |session, cx| { + // session.initialize_sequence(initialized_rx, this.clone(), cx) + // })? + // .await + // }; + // match seq_result().await { + // Ok(_) => {} + // Err(error) => { + // this.update(cx, |this, cx| { + // cx.emit(DapStoreEvent::Notification(error.to_string())); + // this.shutdown_session(session_id, cx) + // })? + // .await + // .log_err(); + + // return Err(error); + // } + // } } pub fn session_id(&self) -> SessionId { @@ -833,7 +788,7 @@ impl Session { } pub fn parent_id(&self) -> Option { - self.parent_id + self.parent_session } pub fn capabilities(&self) -> &Capabilities { @@ -841,7 +796,7 @@ impl Session { } pub fn binary(&self) -> &DebugAdapterBinary { - let Mode::Local(local_mode) = &self.mode else { + let Mode::Running(local_mode) = &self.mode else { panic!("Session is not local"); }; &local_mode.binary @@ -849,13 +804,13 @@ impl Session { pub fn adapter_name(&self) -> SharedString { match &self.mode { - Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(), - Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(), + Mode::Running(local_mode) => local_mode.definition.adapter.clone().into(), + Mode::Building(remote_mode) => remote_mode._adapter_name.clone(), } } pub fn configuration(&self) -> Option { - if let Mode::Local(local_mode) = &self.mode { + if let Mode::Running(local_mode) = &self.mode { Some(local_mode.definition.clone()) } else { None @@ -867,26 +822,26 @@ impl Session { } pub fn is_local(&self) -> bool { - matches!(self.mode, Mode::Local(_)) + matches!(self.mode, Mode::Running(_)) } pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> { match &mut self.mode { - Mode::Local(local_mode) => Some(local_mode), - Mode::Remote(_) => None, + Mode::Running(local_mode) => Some(local_mode), + Mode::Building(_) => None, } } pub fn as_local(&self) -> Option<&LocalMode> { match &self.mode { - Mode::Local(local_mode) => Some(local_mode), - Mode::Remote(_) => None, + Mode::Running(local_mode) => Some(local_mode), + Mode::Building(_) => None, } } pub(super) fn request_initialize(&mut self, cx: &mut Context) -> Task> { match &self.mode { - Mode::Local(local_mode) => { + Mode::Running(local_mode) => { let capabilities = local_mode.clone().request_initialization(cx); cx.spawn(async move |this, cx| { @@ -909,7 +864,7 @@ impl Session { Ok(()) }) } - Mode::Remote(_) => Task::ready(Err(anyhow!( + Mode::Building(_) => Task::ready(Err(anyhow!( "Cannot send initialize request from remote session" ))), } @@ -922,10 +877,10 @@ impl Session { cx: &mut Context, ) -> Task> { match &self.mode { - Mode::Local(local_mode) => { + Mode::Running(local_mode) => { local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx) } - Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))), + Mode::Building(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))), } } @@ -936,7 +891,7 @@ impl Session { cx: &mut Context, ) { match &mut self.mode { - Mode::Local(local_mode) => { + Mode::Running(local_mode) => { if !matches!( self.thread_states.thread_state(active_thread_id), Some(ThreadStatus::Stopped) @@ -959,7 +914,7 @@ impl Session { }) .detach(); } - Mode::Remote(_) => {} + Mode::Building(_) => {} } } @@ -1579,8 +1534,8 @@ impl Session { pub fn adapter_client(&self) -> Option> { match self.mode { - Mode::Local(ref local) => Some(local.client.clone()), - Mode::Remote(_) => None, + Mode::Running(ref local) => Some(local.client.clone()), + Mode::Building(_) => None, } } @@ -2005,10 +1960,10 @@ fn create_local_session( .detach(); Session { - mode: Mode::Local(mode), + mode: Mode::Running(mode), id: session_id, child_session_ids: HashSet::default(), - parent_id: parent_session.map(|session| session.read(cx).id), + parent_session: parent_session.map(|session| session.read(cx).id), variables: Default::default(), capabilities: Capabilities::default(), thread_states: ThreadStates::default(), diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 6fcbc29ffe..5230ab12c4 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1070,12 +1070,7 @@ impl Project { cx.new(|_| BreakpointStore::remote(SSH_PROJECT_ID, ssh_proto.clone().into())); let dap_store = cx.new(|cx| { - DapStore::new_ssh( - SSH_PROJECT_ID, - ssh_proto.clone(), - breakpoint_store.clone(), - cx, - ) + DapStore::new_ssh(SSH_PROJECT_ID, ssh.clone(), breakpoint_store.clone(), cx) }); let git_store = cx.new(|cx| {