This commit is contained in:
Piotr Osiewicz
2025-02-17 16:51:39 +01:00
parent 9786da2fe5
commit 95590967eb
12 changed files with 245 additions and 155 deletions

View File

@@ -1,6 +1,5 @@
#[cfg(any(test, feature = "test-support"))]
use crate::transport::FakeTransport;
use crate::transport::Transport;
use ::fs::Fs;
use anyhow::{anyhow, Context as _, Ok, Result};
use async_compression::futures::bufread::GzipDecoder;
@@ -19,9 +18,11 @@ use std::{
collections::{HashMap, HashSet},
ffi::{OsStr, OsString},
fmt::Debug,
net::Ipv4Addr,
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use sysinfo::{Pid, Process};
use task::DebugAdapterConfig;
@@ -89,12 +90,19 @@ impl<'a> From<&'a str> for DebugAdapterName {
}
}
#[derive(Debug, Clone)]
pub struct TcpArguments {
pub host: Ipv4Addr,
pub port: Option<u16>,
pub timeout: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct DebugAdapterBinary {
pub command: String,
pub arguments: Option<Vec<OsString>>,
pub envs: Option<HashMap<String, String>>,
pub cwd: Option<PathBuf>,
pub connection: Option<TcpArguments>,
}
pub struct AdapterVersion {
@@ -261,8 +269,6 @@ pub trait DebugAdapter: 'static + Send + Sync {
.await
}
fn transport(&self) -> Arc<dyn Transport>;
async fn fetch_latest_adapter_version(
&self,
delegate: &dyn DapDelegate,
@@ -316,10 +322,6 @@ impl DebugAdapter for FakeAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
Arc::new(FakeTransport::new())
}
async fn get_binary(
&self,
_: &dyn DapDelegate,
@@ -330,6 +332,11 @@ impl DebugAdapter for FakeAdapter {
Ok(DebugAdapterBinary {
command: "command".into(),
arguments: None,
connection: Some(TcpArguments {
host: Ipv4Addr::LOCALHOST,
port: None,
timeout: None,
}),
envs: None,
cwd: None,
})

View File

@@ -45,27 +45,52 @@ pub struct DebugAdapterClient {
sequence_count: AtomicU64,
binary: DebugAdapterBinary,
executor: BackgroundExecutor,
adapter: Arc<dyn DebugAdapter>,
transport_delegate: TransportDelegate,
}
impl DebugAdapterClient {
pub fn new(
id: DebugAdapterClientId,
adapter: Arc<dyn DebugAdapter>,
binary: DebugAdapterBinary,
cx: &AsyncApp,
) -> Self {
let transport_delegate = TransportDelegate::new(adapter.transport());
pub fn new(cx: &AsyncApp) -> Self {}
Self {
pub async fn start<F>(
id: DebugAdapterClientId,
binary: DebugAdapterBinary,
message_handler: F,
cx: &mut AsyncApp,
) -> Result<()>
where
F: FnMut(Message, &mut App) + 'static + Send + Sync + Clone,
{
let transport_delegate = TransportDelegate::new(binary.clone());
let this = Self {
id,
binary,
adapter,
transport_delegate,
sequence_count: AtomicU64::new(1),
executor: cx.background_executor().clone(),
}
};
let (server_rx, server_tx) = this.transport_delegate.start(&self.binary, cx).await?;
log::info!("Successfully connected to debug adapter");
let client_id = self.id;
// start handling events/reverse requests
cx.update(|cx| {
cx.spawn({
let server_tx = server_tx.clone();
|mut cx| async move {
Self::handle_receive_messages(
client_id,
server_rx,
server_tx,
message_handler,
&mut cx,
)
.await
}
})
.detach_and_log_err(cx);
})
}
pub async fn reconnect<F>(&mut self, message_handler: F, cx: &mut AsyncApp) -> Result<()>
@@ -95,35 +120,6 @@ impl DebugAdapterClient {
.detach_and_log_err(cx);
})
}
pub async fn start<F>(&mut self, message_handler: F, cx: &mut AsyncApp) -> Result<()>
where
F: FnMut(Message, &mut App) + 'static + Send + Sync + Clone,
{
let (server_rx, server_tx) = self.transport_delegate.start(&self.binary, cx).await?;
log::info!("Successfully connected to debug adapter");
let client_id = self.id;
// start handling events/reverse requests
cx.update(|cx| {
cx.spawn({
let server_tx = server_tx.clone();
|mut cx| async move {
Self::handle_receive_messages(
client_id,
server_rx,
server_tx,
message_handler,
&mut cx,
)
.await
}
})
.detach_and_log_err(cx);
})
}
async fn handle_receive_messages<F>(
client_id: DebugAdapterClientId,
server_rx: Receiver<Message>,
@@ -229,18 +225,10 @@ impl DebugAdapterClient {
self.id
}
pub fn adapter(&self) -> &Arc<dyn DebugAdapter> {
&self.adapter
}
pub fn binary(&self) -> &DebugAdapterBinary {
&self.binary
}
pub fn adapter_id(&self) -> String {
self.adapter.name().to_string()
}
/// Get the next sequence id to be used in a request
pub fn next_sequence_id(&self) -> u64 {
self.sequence_count.fetch_add(1, Ordering::Relaxed)
@@ -335,15 +323,17 @@ mod tests {
pub async fn test_initialize_client(cx: &mut TestAppContext) {
init_test(cx);
let adapter = Arc::new(FakeAdapter::new());
let mut client = DebugAdapterClient::new(
crate::client::DebugAdapterClientId(1),
adapter,
DebugAdapterBinary {
command: "command".into(),
arguments: Default::default(),
envs: Default::default(),
connection: Some(TcpArguments {
host: Ipv4Addr::LOCALHOST,
port: None,
timeout: None,
}),
cwd: None,
},
&mut cx.to_async(),
@@ -412,11 +402,15 @@ mod tests {
let mut client = DebugAdapterClient::new(
crate::client::DebugAdapterClientId(1),
adapter,
DebugAdapterBinary {
command: "command".into(),
arguments: Default::default(),
envs: Default::default(),
connection: Some(TCPArguments {
host: Ipv4Addr::LOCALHOST,
port: None,
path: None,
}),
cwd: None,
},
&mut cx.to_async(),
@@ -467,11 +461,16 @@ mod tests {
let mut client = DebugAdapterClient::new(
crate::client::DebugAdapterClientId(1),
adapter,
DebugAdapterBinary {
command: "command".into(),
arguments: Default::default(),
envs: Default::default(),
connection: Some(TCPArguments {
host: Ipv4Addr::LOCALHOST,
port: None,
path: None,
}),
cwd: None,
},
&mut cx.to_async(),

View File

@@ -68,18 +68,88 @@ impl TransportPipe {
type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
enum Transport {
Stdio(StdioTransport),
Tcp(TcpTransport),
#[cfg(any(test, feature = "test-support"))]
Fake(FakeTransport),
}
impl Transport {
async fn start(&self, binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<TransportPipe> {
match self {
Transport::Stdio(stdio_transport) => stdio_transport.start(binary, cx).await,
Transport::Tcp(tcp_transport) => tcp_transport.start(binary, cx).await,
#[cfg(any(test, feature = "test-support"))]
Transport::Fake(fake_transport) => fake_transport.start(binary, cx).await,
}
}
fn has_adapter_logs(&self) -> bool {
match self {
Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
#[cfg(any(test, feature = "test-support"))]
Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
}
}
async fn kill(&self) -> Result<()> {
match self {
Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
#[cfg(any(test, feature = "test-support"))]
Transport::Fake(fake_transport) => fake_transport.kill().await,
}
}
async fn reconnect(&self, cx: &mut AsyncApp) -> Result<TransportPipe> {
match self {
Transport::Stdio(stdio_transport) => stdio_transport.reconnect(cx).await,
Transport::Tcp(tcp_transport) => tcp_transport.reconnect(cx).await,
#[cfg(any(test, feature = "test-support"))]
Transport::Fake(fake_transport) => fake_transport.reconnect(cx).await,
}
}
#[cfg(any(test, feature = "test-support"))]
fn as_fake(&self) -> &FakeTransport {
match self {
Transport::Fake(fake_transport) => fake_transport,
_ => panic!("Not a fake transport layer"),
}
}
}
pub(crate) struct TransportDelegate {
log_handlers: LogHandlers,
current_requests: Requests,
pending_requests: Requests,
transport: Arc<dyn Transport>,
transport: Transport,
server_tx: Arc<Mutex<Option<Sender<Message>>>>,
}
impl Transport {
#[cfg(any(test, feature = "test-support"))]
fn fake(args: DebugAdapterBinary) -> Self {
let this = Self::Fake(FakeTransport::new());
}
}
impl TransportDelegate {
pub fn new(transport: Arc<dyn Transport>) -> Self {
pub async fn new(args: DebugAdapterBinary) -> Self {
Self {
transport,
transport: Transport::new(args).await,
server_tx: Default::default(),
log_handlers: Default::default(),
current_requests: Default::default(),
pending_requests: Default::default(),
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn fake(args: DebugAdapterBinary) -> Self {
Self {
transport: Transport::fake(args),
server_tx: Default::default(),
log_handlers: Default::default(),
current_requests: Default::default(),
@@ -475,41 +545,14 @@ impl TransportDelegate {
}
}
#[async_trait(?Send)]
pub trait Transport: 'static + Send + Sync + Any {
async fn start(&self, binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<TransportPipe>;
fn has_adapter_logs(&self) -> bool;
async fn kill(&self) -> Result<()>;
async fn reconnect(&self, _: &mut AsyncApp) -> Result<TransportPipe> {
bail!("Cannot reconnect to adapter")
}
#[cfg(any(test, feature = "test-support"))]
fn as_fake(&self) -> &FakeTransport {
panic!("Called as_fake on a real adapter");
}
}
pub struct TcpTransport {
port: u16,
host: Ipv4Addr,
timeout: Option<u64>,
process: Arc<Mutex<Option<Child>>>,
process: Arc<Mutex<Child>>,
}
impl TcpTransport {
pub fn new(host: Ipv4Addr, port: u16, timeout: Option<u64>) -> Self {
Self {
port,
host,
timeout,
process: Arc::new(Mutex::new(None)),
}
}
/// Get an open port to use with the tcp client when not supplied by debug config
pub async fn port(host: &TCPHost) -> Result<u16> {
if let Some(port) = host.port {
@@ -521,10 +564,13 @@ impl TcpTransport {
.port())
}
}
}
async fn port_for_host(host: Ipv4Addr) -> Result<u16> {
Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
.await?
.local_addr()?
.port())
}
#[async_trait(?Send)]
impl Transport for TcpTransport {
async fn reconnect(&self, cx: &mut AsyncApp) -> Result<TransportPipe> {
let address = SocketAddrV4::new(self.host, self.port);
@@ -563,7 +609,26 @@ impl Transport for TcpTransport {
))
}
async fn start(&self, binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<TransportPipe> {
async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<TransportPipe> {
let Some(connection_args) = binary.connection.as_ref() else {
return Err(anyhow!("No connection arguments provided"));
};
let host = connection_args.host;
let port = if let Some(port) = connection_args.port {
port
} else {
TcpListener::bind(SocketAddrV4::new(host, 0))
.await
.with_context(|| {
format!(
"Failed to connect to debug adapter over tcp. host: {}",
host
)
})?
.local_addr()?
.port()
};
let mut command = util::command::new_smol_command(&binary.command);
if let Some(cwd) = &binary.cwd {
@@ -588,16 +653,16 @@ impl Transport for TcpTransport {
.spawn()
.with_context(|| "failed to start debug adapter.")?;
let address = SocketAddrV4::new(self.host, self.port);
let address = SocketAddrV4::new(host, port);
let timeout = self.timeout.unwrap_or_else(|| {
let timeout = connection_args.timeout.unwrap_or_else(|| {
cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
.unwrap_or(2000u64)
});
let (rx, tx) = select! {
_ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", self.host, self.port)))
return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
},
result = cx.spawn(|cx| async move {
loop {
@@ -612,8 +677,8 @@ impl Transport for TcpTransport {
};
log::info!(
"Debug adapter has connected to TCP server {}:{}",
self.host,
self.port
host,
port
);
let stdout = process.stdout.take();
@@ -654,10 +719,7 @@ impl StdioTransport {
process: Arc::new(Mutex::new(None)),
}
}
}
#[async_trait(?Send)]
impl Transport for StdioTransport {
async fn start(&self, binary: &DebugAdapterBinary, _: &mut AsyncApp) -> Result<TransportPipe> {
let mut command = util::command::new_smol_command(&binary.command);
@@ -721,6 +783,10 @@ impl Transport for StdioTransport {
false
}
async fn reconnect(&self, _: &mut AsyncApp) -> Result<TransportPipe> {
bail!("Cannot reconnect to adapter")
}
async fn kill(&self) -> Result<()> {
if let Some(mut process) = self.process.lock().await.take() {
process.kill()?;
@@ -803,11 +869,7 @@ impl FakeTransport {
.await
.insert(R::COMMAND, Box::new(handler));
}
}
#[cfg(any(test, feature = "test-support"))]
#[async_trait(?Send)]
impl Transport for FakeTransport {
async fn reconnect(&self, cx: &mut AsyncApp) -> Result<TransportPipe> {
self.start(
&DebugAdapterBinary {
@@ -815,6 +877,7 @@ impl Transport for FakeTransport {
arguments: None,
envs: None,
cwd: None,
connection: TcpTransport::new(None, None),
},
cx,
)

View File

@@ -1,6 +1,6 @@
use std::{ffi::OsString, path::PathBuf, sync::Arc};
use dap::transport::{StdioTransport, TcpTransport, Transport};
use dap::transport::{StdioTransport, TcpTransport};
use gpui::AsyncApp;
use serde_json::Value;
use task::DebugAdapterConfig;
@@ -36,10 +36,6 @@ impl DebugAdapter for CustomDebugAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
self.transport.clone()
}
async fn get_binary(
&self,
_: &dyn DapDelegate,

View File

@@ -1,4 +1,4 @@
use dap::transport::{TcpTransport, Transport};
use dap::transport::TcpTransport;
use gpui::AsyncApp;
use std::{ffi::OsStr, net::Ipv4Addr, path::PathBuf, sync::Arc};
@@ -28,10 +28,6 @@ impl DebugAdapter for GoDebugAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
Arc::new(TcpTransport::new(self.host, self.port, self.timeout))
}
async fn get_binary(
&self,
delegate: &dyn DapDelegate,

View File

@@ -1,5 +1,5 @@
use adapters::latest_github_release;
use dap::transport::{TcpTransport, Transport};
use dap::transport::TcpTransport;
use gpui::AsyncApp;
use regex::Regex;
use std::{collections::HashMap, net::Ipv4Addr, path::PathBuf, sync::Arc};
@@ -33,10 +33,6 @@ impl DebugAdapter for JsDebugAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
Arc::new(TcpTransport::new(self.host, self.port, self.timeout))
}
async fn fetch_latest_adapter_version(
&self,
delegate: &dyn DapDelegate,

View File

@@ -2,7 +2,7 @@ use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc};
use anyhow::Result;
use async_trait::async_trait;
use dap::transport::{StdioTransport, Transport};
use dap::transport::StdioTransport;
use gpui::AsyncApp;
use sysinfo::{Pid, Process};
use task::{DebugAdapterConfig, DebugRequestType};
@@ -25,10 +25,6 @@ impl DebugAdapter for LldbDebugAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
Arc::new(StdioTransport::new())
}
async fn get_binary(
&self,
delegate: &dyn DapDelegate,

View File

@@ -1,5 +1,5 @@
use adapters::latest_github_release;
use dap::transport::{TcpTransport, Transport};
use dap::{adapters::TcpArguments, transport::TcpTransport};
use gpui::AsyncApp;
use std::{net::Ipv4Addr, path::PathBuf, sync::Arc};
@@ -30,10 +30,6 @@ impl DebugAdapter for PhpDebugAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
Arc::new(TcpTransport::new(self.host, self.port, self.timeout))
}
async fn fetch_latest_adapter_version(
&self,
delegate: &dyn DapDelegate,
@@ -92,6 +88,11 @@ impl DebugAdapter for PhpDebugAdapter {
adapter_path.join(Self::ADAPTER_PATH).into(),
format!("--server={}", self.port).into(),
]),
connection: Some(TcpArguments {
port: Some(self.port),
host: Ipv4Addr::LOCALHOST,
timeout: None,
}),
cwd: config.cwd.clone(),
envs: None,
})

View File

@@ -1,5 +1,5 @@
use crate::*;
use dap::transport::{TcpTransport, Transport};
use dap::transport::TcpTransport;
use gpui::AsyncApp;
use std::{ffi::OsStr, net::Ipv4Addr, path::PathBuf, sync::Arc};
@@ -29,10 +29,6 @@ impl DebugAdapter for PythonDebugAdapter {
DebugAdapterName(Self::ADAPTER_NAME.into())
}
fn transport(&self) -> Arc<dyn Transport> {
Arc::new(TcpTransport::new(self.host, self.port, self.timeout))
}
async fn fetch_latest_adapter_version(
&self,
delegate: &dyn DapDelegate,

View File

@@ -573,7 +573,7 @@ impl DapLogView {
let client = client.read(cx).adapter_client()?;
Some(DapMenuItem {
client_id: client.id(),
client_name: client.adapter_id(),
client_name: unimplemented!(),
has_adapter_logs: client.has_adapter_logs(),
selected_entry: self.current_view.map_or(LogKind::Adapter, |(_, kind)| kind),
})

View File

@@ -620,7 +620,7 @@ impl DapStore {
}
};
let mut client = DebugAdapterClient::new(client_id, adapter, binary, &cx);
let mut client = DebugAdapterClient::new(client_id, binary, &cx);
client
.start(

View File

@@ -1,17 +1,20 @@
use super::breakpoint_store::BreakpointStore;
use super::dap_command::{
self, ContinueCommand, DapCommand, DisconnectCommand, EvaluateCommand, NextCommand,
PauseCommand, RestartCommand, RestartStackFrameCommand, ScopesCommand, SetVariableValueCommand,
StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
TerminateThreadsCommand, VariablesCommand,
};
use super::dap_store::DapAdapterDelegate;
use anyhow::{anyhow, Result};
use collections::{HashMap, IndexMap};
use dap::client::{DebugAdapterClient, DebugAdapterClientId};
use dap::{
Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, SteppingGranularity,
};
use dap_adapters::build_adapter;
use futures::{future::Shared, FutureExt};
use gpui::{App, Context, Entity, Task};
use gpui::{App, AppContext, Context, Entity, Task};
use rpc::AnyProtoClient;
use serde_json::Value;
use std::u64;
@@ -142,10 +145,37 @@ impl RemoteConnection {
}
pub enum Mode {
Local(Arc<DebugAdapterClient>),
Local(LocalMode),
Remote(RemoteConnection),
}
struct LocalMode {
client: Arc<DebugAdapterClient>,
}
impl LocalMode {
fn new(
breakpoint_store: Entity<BreakpointStore>,
disposition: DebugAdapterConfig,
delegate: DapAdapterDelegate,
cx: &mut App,
) -> Task<Result<Self>> {
cx.spawn(move |cx| async move {
let adapter = build_adapter(&config.kind).await?;
let binary = cx.update(|cx| {
let name = DebugAdapterName::from(adapter.name().as_ref());
ProjectSettings::get_global(cx)
.dap
.get(&name)
.and_then(|s| s.binary.as_ref().map(PathBuf::from))
})?;
todo!()
})
}
}
impl From<RemoteConnection> for Mode {
fn from(value: RemoteConnection) -> Self {
Self::Remote(value)
@@ -293,20 +323,30 @@ impl CompletionsQuery {
}
impl Session {
pub(crate) fn local(adapter: Arc<DebugAdapterClient>, config: DebugAdapterConfig) -> Self {
let client_id = adapter.id();
Self {
mode: Mode::Local(adapter),
client_id,
config,
capabilities: unimplemented!(),
ignore_breakpoints: false,
requests: HashMap::default(),
modules: Vec::default(),
loaded_sources: Vec::default(),
threads: IndexMap::default(),
}
pub(crate) fn local(
breakpoints: Entity<BreakpointStore>,
client_id: DebugAdapterClientId,
delegate: DapAdapterDelegate,
config: DebugAdapterConfig,
cx: &mut App,
) -> Task<Result<Entity<Self>>> {
cx.spawn(move |cx| async move {
let adapter = build_adapter(&config.kind).await?;
let mode = LocalMode::new(breakpoints, config, adapter, cx).await?;
cx.update(|cx| {
cx.new(|cx| Self {
mode: Mode::Local(mode),
client_id,
config,
capabilities: unimplemented!(),
ignore_breakpoints: false,
requests: HashMap::default(),
modules: Vec::default(),
loaded_sources: Vec::default(),
threads: IndexMap::default(),
})
})
})
}
pub(crate) fn remote(