Compare commits

...

4 Commits

Author SHA1 Message Date
Yesterday17
d1965c9242 Remove some unwraps 2024-04-29 10:57:48 +08:00
Yesterday17
08a81b8726 Await all futures 2024-04-29 10:39:49 +08:00
Yesterday17
cb6f0d9e87 Remove unused imports 2024-04-28 21:00:02 +08:00
Yesterday17
12364a71a1 Working mvp for remote terminal 2024-04-28 04:09:55 +08:00
13 changed files with 974 additions and 110 deletions

154
Cargo.lock generated
View File

@@ -100,7 +100,7 @@ dependencies = [
"miow",
"parking_lot",
"piper",
"polling 3.3.2",
"polling 3.7.0",
"regex-automata 0.4.5",
"rustix-openpty",
"serde",
@@ -563,7 +563,7 @@ dependencies = [
"futures-io",
"futures-lite 2.2.0",
"parking",
"polling 3.3.2",
"polling 3.7.0",
"rustix 0.38.32",
"slab",
"tracing",
@@ -1746,7 +1746,7 @@ checksum = "fba7adb4dd5aa98e5553510223000e7148f621165ec5f9acd7113f6ca4995298"
dependencies = [
"bitflags 2.4.2",
"log",
"polling 3.3.2",
"polling 3.7.0",
"rustix 0.38.32",
"slab",
"thiserror",
@@ -2926,7 +2926,7 @@ dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"memoffset 0.9.0",
"scopeguard",
]
@@ -4790,9 +4790,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.3.3"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "hex"
@@ -5173,7 +5173,7 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi 0.3.3",
"hermit-abi 0.3.9",
"libc",
"windows-sys 0.48.0",
]
@@ -5184,6 +5184,15 @@ version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a611371471e98973dbcab4e0ec66c31a10bc356eeb4d54a0e05eac8158fe38c"
[[package]]
name = "ioctl-rs"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d"
dependencies = [
"libc",
]
[[package]]
name = "iovec"
version = "0.1.4"
@@ -5988,6 +5997,15 @@ dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "memoffset"
version = "0.9.0"
@@ -6230,6 +6248,20 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
dependencies = [
"autocfg",
"bitflags 1.3.2",
"cfg-if",
"libc",
"memoffset 0.6.5",
"pin-utils",
]
[[package]]
name = "nix"
version = "0.27.1"
@@ -6239,7 +6271,7 @@ dependencies = [
"bitflags 2.4.2",
"cfg-if",
"libc",
"memoffset",
"memoffset 0.9.0",
]
[[package]]
@@ -6476,7 +6508,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi 0.3.3",
"hermit-abi 0.3.9",
"libc",
]
@@ -7216,12 +7248,13 @@ dependencies = [
[[package]]
name = "polling"
version = "3.3.2"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "545c980a3880efd47b2e262f6a4bb6daad6555cf3367aa9c4e52895f69537a41"
checksum = "645493cf344456ef24219d02a768cf1fb92ddf8c92161679ae3d91b91a637be3"
dependencies = [
"cfg-if",
"concurrent-queue",
"hermit-abi 0.3.9",
"pin-project-lite",
"rustix 0.38.32",
"tracing",
@@ -7234,6 +7267,27 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7"
[[package]]
name = "portable-pty"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806ee80c2a03dbe1a9fb9534f8d19e4c0546b790cde8fd1fea9d6390644cb0be"
dependencies = [
"anyhow",
"bitflags 1.3.2",
"downcast-rs",
"filedescriptor",
"lazy_static",
"libc",
"log",
"nix 0.25.1",
"serial",
"shared_library",
"shell-words",
"winapi",
"winreg 0.10.1",
]
[[package]]
name = "postage"
version = "0.5.0"
@@ -7968,7 +8022,7 @@ dependencies = [
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
"winreg 0.50.0",
]
[[package]]
@@ -8803,6 +8857,48 @@ dependencies = [
"serde",
]
[[package]]
name = "serial"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86"
dependencies = [
"serial-core",
"serial-unix",
"serial-windows",
]
[[package]]
name = "serial-core"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581"
dependencies = [
"libc",
]
[[package]]
name = "serial-unix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7"
dependencies = [
"ioctl-rs",
"libc",
"serial-core",
"termios",
]
[[package]]
name = "serial-windows"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162"
dependencies = [
"libc",
"serial-core",
]
[[package]]
name = "settings"
version = "0.1.0"
@@ -8892,6 +8988,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shared_library"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9e7e0f2bfae24d8a5b5a66c5b257a83c7412304311512a0c054cd5e619da11"
dependencies = [
"lazy_static",
"libc",
]
[[package]]
name = "shell-words"
version = "1.1.0"
@@ -9883,6 +9989,8 @@ dependencies = [
"futures 0.3.28",
"gpui",
"libc",
"polling 3.7.0",
"portable-pty",
"rand 0.8.5",
"schemars",
"serde",
@@ -9929,6 +10037,15 @@ dependencies = [
"workspace",
]
[[package]]
name = "termios"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a"
dependencies = [
"libc",
]
[[package]]
name = "text"
version = "0.1.0"
@@ -10789,7 +10906,7 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89daebc3e6fd160ac4aa9fc8b3bf71e1f74fbf92367ae71fb83a037e8bf164b9"
dependencies = [
"memoffset",
"memoffset 0.9.0",
"tempfile",
"winapi",
]
@@ -11504,7 +11621,7 @@ dependencies = [
"log",
"mach",
"memfd",
"memoffset",
"memoffset 0.9.0",
"paste",
"psm",
"rustix 0.38.32",
@@ -12194,6 +12311,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "winreg"
version = "0.50.0"

View File

@@ -458,6 +458,27 @@ impl Database {
.await
}
pub async fn update_remote_terminal(
&self,
update: &proto::UpdateRemoteTerminal,
connection: ConnectionId,
) -> Result<TransactionGuard<Vec<ConnectionId>>> {
let project_id = ProjectId::from_proto(update.project_id);
self.project_transaction(project_id, |tx| async move {
// Ensure the update comes from the host.
let project = project::Entity::find_by_id(project_id)
.one(&*tx)
.await?
.ok_or_else(|| anyhow!("no such project"))?;
if project.host_connection()? != connection {
return Err(anyhow!("can't update a project hosted by someone else"))?;
}
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
Ok(connection_ids)
})
.await
}
/// Starts the language server for the given connection.
pub async fn start_language_server(
&self,

View File

@@ -423,6 +423,13 @@ impl Server {
.add_message_handler(update_language_server)
.add_message_handler(update_diagnostic_summary)
.add_message_handler(update_worktree_settings)
.add_request_handler(user_handler(
forward_read_only_project_request::<proto::CreateRemoteTerminal>,
))
.add_request_handler(user_handler(
forward_read_only_project_request::<proto::InputRemoteTerminal>,
))
.add_message_handler(update_remote_terminal)
.add_request_handler(user_handler(
forward_read_only_project_request::<proto::GetHover>,
))
@@ -2572,6 +2579,30 @@ async fn update_diagnostic_summary(
Ok(())
}
/// Updates other participants with changes to the diagnostics
async fn update_remote_terminal(
message: proto::UpdateRemoteTerminal,
session: Session,
) -> Result<()> {
let guest_connection_ids = session
.db()
.await
.update_remote_terminal(&message, session.connection_id)
.await?;
broadcast(
Some(session.connection_id),
guest_connection_ids.iter().copied(),
|connection_id| {
session
.peer
.forward_send(session.connection_id, connection_id, message.clone())
},
);
Ok(())
}
/// Updates other participants with changes to the worktree settings
async fn update_worktree_settings(
message: proto::UpdateWorktreeSettings,

View File

@@ -30,7 +30,7 @@ use futures::{
future::{join_all, try_join_all, Shared},
select,
stream::FuturesUnordered,
AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
AsyncWriteExt, Future, FutureExt, SinkExt, StreamExt, TryFutureExt,
};
use git::{blame::Blame, repository::GitRepository};
use globset::{Glob, GlobSet, GlobSetBuilder};
@@ -653,6 +653,9 @@ impl Project {
client.add_model_request_handler(Self::handle_lsp_command::<lsp_ext_command::ExpandMacro>);
client.add_model_request_handler(Self::handle_blame_buffer);
client.add_model_request_handler(Self::handle_multi_lsp_query);
client.add_model_request_handler(Self::handle_create_terminal);
client.add_model_message_handler(Self::handle_update_terminal);
client.add_model_message_handler(Self::handle_input_terminal);
}
pub fn local(
@@ -716,6 +719,7 @@ impl Project {
nonce: StdRng::from_entropy().gen(),
terminals: Terminals {
local_handles: Vec::new(),
remote_handles: HashMap::default(),
},
copilot_lsp_subscription,
copilot_log_subscription: None,
@@ -872,6 +876,7 @@ impl Project {
nonce: StdRng::from_entropy().gen(),
terminals: Terminals {
local_handles: Vec::new(),
remote_handles: HashMap::default(),
},
copilot_lsp_subscription,
copilot_log_subscription: None,
@@ -8059,6 +8064,19 @@ impl Project {
}
}
async fn handle_create_terminal(
this: Model<Self>,
_: TypedEnvelope<proto::CreateRemoteTerminal>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<proto::CreateRemoteTerminalResponse> {
this.update(&mut cx, |project, cx| project.create_headless_terminal(cx))??;
// let terminal = Terminal::new_remote(&this, cx.clone());
// this.terminals.push(terminal.clone());
// cx.emit(Event::TerminalCreated(terminal));
Ok(proto::CreateRemoteTerminalResponse { terminal_id: 0 })
}
async fn handle_unshare_project(
this: Model<Self>,
_: TypedEnvelope<proto::UnshareProject>,
@@ -8622,6 +8640,44 @@ impl Project {
})?
}
async fn handle_update_terminal(
this: Model<Self>,
envelope: TypedEnvelope<proto::UpdateRemoteTerminal>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
let terminal_id = envelope.payload.terminal_id;
let sender = this.update(&mut cx, |this, _| {
this.terminals
.remote_handles
.get(&terminal_id)
.map(|s| s.to_owned())
})?;
if let Some(mut sender) = sender {
sender.send(envelope.payload.data).await?;
}
Ok(())
}
async fn handle_input_terminal(
this: Model<Self>,
envelope: TypedEnvelope<proto::InputRemoteTerminal>,
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
let terminal_id = envelope.payload.terminal_id;
let sender = this.update(&mut cx, |this, _| {
this.terminals
.remote_handles
.get(&terminal_id)
.map(|s| s.to_owned())
})?;
if let Some(mut sender) = sender {
sender.send(envelope.payload.data).await?;
}
Ok(())
}
async fn handle_update_buffer_file(
this: Model<Self>,
envelope: TypedEnvelope<proto::UpdateBufferFile>,

View File

@@ -1,11 +1,18 @@
use crate::Project;
use collections::HashMap;
use gpui::{AnyWindowHandle, Context, Entity, Model, ModelContext, WeakModel};
use futures::{
channel::mpsc::{self, UnboundedSender},
StreamExt,
};
use gpui::{AnyWindowHandle, Context, Entity, Model, ModelContext, Task, WeakModel};
use rpc::proto;
use settings::Settings;
use smol::channel::bounded;
use smol::{channel::bounded, io::AsyncReadExt};
use std::path::{Path, PathBuf};
use task::SpawnInTerminal;
use terminal::{
headless::{portable_pty::CommandBuilder, HeadlessTerminal},
pty::RemotePty,
terminal_settings::{self, Shell, TerminalSettings, VenvSettingsContent},
TaskState, TaskStatus, Terminal, TerminalBuilder,
};
@@ -16,6 +23,7 @@ use util::ResultExt;
pub struct Terminals {
pub(crate) local_handles: Vec<WeakModel<terminal::Terminal>>,
pub(crate) remote_handles: HashMap<u64, UnboundedSender<Vec<u8>>>,
}
impl Project {
@@ -25,12 +33,77 @@ impl Project {
spawn_task: Option<SpawnInTerminal>,
window: AnyWindowHandle,
cx: &mut ModelContext<Self>,
) -> anyhow::Result<Model<Terminal>> {
anyhow::ensure!(
!self.is_remote(),
"creating terminals as a guest is not supported yet"
);
) -> Task<anyhow::Result<Model<Terminal>>> {
if self.is_remote() {
println!("!!!is remote");
let rpc = self.client();
let project_id = self.remote_id().unwrap();
// let settings = TerminalSettings::get_global(cx);
cx.spawn(move |project, mut cx| async move {
let (pty, host_tx, mut input_rx) = RemotePty::new(&cx).await;
project.update(&mut cx, |this, _| {
this.terminals.remote_handles.insert(0, host_tx) // TODO: use real terminal id
})?;
let rpc2 = rpc.clone();
cx.spawn(|_| async move {
loop {
let input = input_rx.next().await.unwrap();
rpc2.send(proto::InputRemoteTerminal {
project_id,
terminal_id: 0,
data: input,
})
.unwrap();
}
//
})
.detach();
let response = rpc
.request(proto::CreateRemoteTerminal { project_id })
.await?;
println!("!!!request done");
println!("!!!pty done");
let terminal = TerminalBuilder::new_remote(
None,
terminal_settings::AlternateScroll::Off,
None,
pty,
)?;
println!("!!!terminal created");
let terminal_handle = cx
.new_model(|cx| terminal.subscribe(cx))
.map(|terminal_handle| terminal_handle);
let weak_handle = terminal_handle.as_ref().map(|t| t.downgrade());
if let Ok(terminal_handle) = weak_handle {
project.update(&mut cx, |this, _| {
this.terminals.local_handles.push(terminal_handle);
})?;
}
println!("!!!terminal model created");
terminal_handle
})
} else {
Task::ready(self.create_local_terminal(working_directory, spawn_task, window, cx))
}
}
pub fn create_local_terminal(
&mut self,
working_directory: Option<PathBuf>,
spawn_task: Option<SpawnInTerminal>,
window: AnyWindowHandle,
cx: &mut ModelContext<Self>,
) -> anyhow::Result<Model<Terminal>> {
let is_terminal = spawn_task.is_none();
let settings = TerminalSettings::get_global(cx);
let python_settings = settings.detect_venv.clone();
@@ -122,6 +195,168 @@ impl Project {
terminal
}
pub fn create_headless_terminal(&mut self, cx: &mut ModelContext<Self>) -> anyhow::Result<()> {
println!("create_headless_terminal");
let rpc = self.client();
let project_id = self.remote_id().unwrap();
let terminal = HeadlessTerminal::new(None, None)?;
let cmd = CommandBuilder::new("bash");
// let task: Task<anyhow::Result<()>> = cx.spawn(|_, cx| async move {
// let tty = Arc::new(Mutex::new(terminal.tty));
// let poll = terminal.poll.clone();
// let mut buffer = [0; 1024];
// let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap());
// 'event_loop: loop {
// // Wakeup the event loop when a synchronized update timeout was reached.
// events.clear();
// if let Err(err) = poll.wait(&mut events, Some(Duration::from_secs(2))) {
// match err.kind() {
// ErrorKind::Interrupted => continue,
// _ => {
// break 'event_loop;
// }
// }
// }
// // Handle synchronized update timeout.
// if events.is_empty() {
// // state.parser.stop_sync(&mut *self.terminal.lock());
// // self.event_proxy.send_event(Event::Wakeup);
// continue;
// }
// for event in events.iter() {
// match event.key {
// 0 => {
// if event.is_interrupt() {
// // Don't try to do I/O on a dead PTY.
// continue;
// }
// if event.readable {
// let tty = tty.clone();
// let n = smol::unblock(move || {
// tty.lock().unwrap().reader().read(&mut buffer)
// })
// .await;
// if let Err(e) = n {
// if let ErrorKind::Interrupted | ErrorKind::WouldBlock = e.kind()
// {
// continue;
// } else {
// break;
// }
// }
// let n = n.unwrap();
// if n == 0 {
// break;
// }
// println!("!!!Read {} bytes", n);
// rpc.send(proto::UpdateRemoteTerminal {
// terminal_id: 0,
// project_id,
// data: buffer[..n].to_vec(),
// })?;
// // if let Err(err) = self.pty_read(&mut state, &mut buf, pipe.as_mut())
// // {
// // // On Linux, a `read` on the master side of a PTY can fail
// // // with `EIO` if the client side hangs up. In that case,
// // // just loop back round for the inevitable `Exited` event.
// // // This sucks, but checking the process is either racy or
// // // blocking.
// // #[cfg(target_os = "linux")]
// // if err.raw_os_error() == Some(libc::EIO) {
// // continue;
// // }
// // error!("Error reading from PTY in event loop: {}", err);
// // break 'event_loop;
// // }
// }
// }
// _ => (),
// }
// }
// }
// // loop {
// // let tty = tty.clone();
// // let n = smol::unblock(move || tty.lock().unwrap().reader().read(&mut buffer)).await;
// // if let Err(e) = n {
// // if let ErrorKind::Interrupted | ErrorKind::WouldBlock = e.kind() {
// // continue;
// // } else {
// // break;
// // }
// // }
// // let n = n.unwrap();
// // if n == 0 {
// // break;
// // }
// // println!("!!!Read {} bytes", n);
// // rpc.send(proto::UpdateRemoteTerminal {
// // terminal_id: 0,
// // project_id,
// // data: buffer[..n].to_vec(),
// // })?;
// // }
// Ok(())
// });
let pair = terminal.pair;
let reader = pair.master.try_clone_reader()?;
let mut writer = pair.master.take_writer()?;
let read: Task<anyhow::Result<()>> = cx.spawn(|_, _| async move {
let _child = smol::unblock(move || pair.slave.spawn_command(cmd)).await?;
let mut reader = smol::Unblock::new(reader);
let mut buffer = [0; 1024];
loop {
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
println!("!!!Read {} bytes", n);
rpc.send(proto::UpdateRemoteTerminal {
terminal_id: 0,
project_id,
data: buffer[..n].to_vec(),
})?;
}
Ok(())
});
read.detach_and_log_err(cx);
let (input_tx, mut input_rx) = mpsc::unbounded();
let write = cx.spawn(|project, mut cx| async move {
project
.update(&mut cx, |project, _| {
project.terminals.remote_handles.insert(0, input_tx);
})
.unwrap();
loop {
let input = input_rx.next().await.unwrap();
writer.write_all(&input).unwrap();
}
});
write.detach();
// let mut reader = pair.master.try_clone_reader()?;
// writeln!(pair.master.take_writer()?, "ls -l\r\n")?;
Ok(())
}
pub fn find_activate_script_path(
&mut self,
settings: &VenvSettingsContent,

View File

@@ -237,6 +237,11 @@ message Envelope {
RemoteProjectsUpdate remote_projects_update = 193;
ValidateRemoteProjectRequest validate_remote_project_request = 194; // Current max
DeleteDevServer delete_dev_server = 195;
CreateRemoteTerminal create_remote_terminal = 196;
CreateRemoteTerminalResponse create_remote_terminal_response = 197;
UpdateRemoteTerminal update_remote_terminal = 198;
InputRemoteTerminal input_remote_terminal = 199;
}
reserved 158 to 161;
@@ -504,6 +509,26 @@ message ReconnectDevServerResponse {
repeated ResharedProject reshared_projects = 1;
}
message CreateRemoteTerminal {
uint64 project_id = 1;
}
message CreateRemoteTerminalResponse {
uint64 terminal_id = 1;
}
message UpdateRemoteTerminal {
uint64 project_id = 1;
uint64 terminal_id = 2;
bytes data = 3;
}
message InputRemoteTerminal {
uint64 project_id = 1;
uint64 terminal_id = 2;
bytes data = 3;
}
message DevServerInstructions {
repeated RemoteProject projects = 1;
}

View File

@@ -319,7 +319,11 @@ messages!(
(MultiLspQueryResponse, Background),
(RemoteProjectsUpdate, Foreground),
(ValidateRemoteProjectRequest, Background),
(DeleteDevServer, Foreground)
(DeleteDevServer, Foreground),
(CreateRemoteTerminal, Foreground),
(CreateRemoteTerminalResponse, Foreground),
(UpdateRemoteTerminal, Foreground),
(InputRemoteTerminal, Foreground),
);
request_messages!(
@@ -423,6 +427,9 @@ request_messages!(
(ValidateRemoteProjectRequest, Ack),
(MultiLspQuery, MultiLspQueryResponse),
(DeleteDevServer, Ack),
(CreateRemoteTerminal, CreateRemoteTerminalResponse),
(UpdateRemoteTerminal, Ack),
(InputRemoteTerminal, Ack),
);
entity_messages!(
@@ -479,6 +486,9 @@ entity_messages!(
UpdateWorktree,
UpdateWorktreeSettings,
LspExtExpandMacro,
CreateRemoteTerminal,
UpdateRemoteTerminal,
InputRemoteTerminal,
);
entity_messages!(

View File

@@ -32,6 +32,8 @@ smol.workspace = true
theme.workspace = true
thiserror.workspace = true
util.workspace = true
polling = "3.7.0"
portable-pty = "0.8.1"
[target.'cfg(windows)'.dependencies]
windows.workspace = true

View File

@@ -0,0 +1,71 @@
use gpui::AnyWindowHandle;
pub use polling;
pub use portable_pty;
use portable_pty::{native_pty_system, PtyPair, PtySize, PtySystem};
use crate::TerminalSize;
pub struct HeadlessTerminal {
_pty_system: Box<dyn PtySystem + Send>,
pub pair: PtyPair,
// pub tty: tty::Pty,
// pub poll: Arc<Poller>,
}
impl HeadlessTerminal {
pub fn new(
_window: Option<AnyWindowHandle>,
size: Option<TerminalSize>,
) -> anyhow::Result<Self> {
// setup_env();
// let poll = Arc::new(Poller::new().unwrap());
// let poll_opts = PollMode::Level;
// let interest = PollingEvent::readable(0);
// let mut pty = tty::new(
// &alacritty_terminal::tty::Options {
// shell: None,
// working_directory: Some("/tmp".into()),
// hold: false,
// env: Default::default(),
// },
// size.unwrap_or_else(|| TerminalSize::default()).into(),
// 0,
// // window.window_id().as_u64(),
// )
// .unwrap();
// // Register TTY through EventedRW interface.
// if let Err(err) = unsafe { pty.register(&poll, interest, poll_opts) } {
// anyhow::bail!("Failed to register TTY: {}", err);
// }
// let pty_system = native_pty_system();
// let pair = pty_system
// .openpty(PtySize {
// rows: size.map(|s| s.num_lines() as u16).unwrap_or(24),
// cols: size.map(|s| s.num_columns() as u16).unwrap_or(80),
// pixel_width: size.map(|s| s.width().0 as u16).unwrap_or(0),
// pixel_height: size.map(|s| s.height().0 as u16).unwrap_or(0),
// })
// .unwrap();
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: size.map(|s| s.num_lines() as u16).unwrap_or(24),
cols: size.map(|s| s.num_columns() as u16).unwrap_or(80),
pixel_width: size.map(|s| s.width().0 as u16).unwrap_or(0),
pixel_height: size.map(|s| s.height().0 as u16).unwrap_or(0),
})
.unwrap();
Ok(Self {
_pty_system: pty_system,
pair,
// tty: pty,
// poll: poll,
})
}
}

153
crates/terminal/src/pty.rs Normal file
View File

@@ -0,0 +1,153 @@
use std::{io::Write, os::unix::net::UnixStream};
use alacritty_terminal::{
event::OnResize,
tty::{EventedPty, EventedReadWrite},
};
use futures::{
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
AsyncReadExt, SinkExt,
};
use gpui::AsyncAppContext;
use polling::{Event, PollMode, Poller};
use smol::stream::StreamExt;
pub struct RemotePty {
reader: UnixStream,
writer: UnixStream,
}
impl RemotePty {
pub async fn new(
cx: &AsyncAppContext,
) -> (Self, UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) {
let (host_tx, host_rx) = mpsc::unbounded::<Vec<u8>>();
let (mut input_tx, input_rx) = mpsc::unbounded();
// let reader = VecDeque::new();
// println!("before open fifo");
// let reader = smol::unblock(|| File::open("/tmp/fifo").unwrap()).await;
// println!("after open fifo");
// let writer = VecDeque::new();
let (mut sender, reader) = UnixStream::pair().unwrap();
let (receiver, writer) = UnixStream::pair().unwrap();
reader.set_nonblocking(true).unwrap();
// reader.set_read_timeout(Some(Duration::from_secs(2)));
let result = Self { reader, writer };
cx.spawn(|_| async move {
// let mut write = File::create("/tmp/fifo").unwrap();
let mut rx = host_rx;
loop {
if let Some(data) = rx.next().await {
print!("!!on data ");
for byte in data.iter() {
print!("{:02x} ", byte);
}
println!();
let _ = sender.write_all(data.as_ref());
println!("!!data written");
} else {
println!("!!break");
break;
}
}
})
.detach();
cx.spawn(|_| async move {
let mut buffer = [0u8; 1024];
let mut receiver = smol::Unblock::new(receiver);
loop {
let n = receiver.read(&mut buffer).await.unwrap();
if n == 0 {
break;
}
print!("!!read data ");
for byte in buffer[..n].iter() {
print!("{:02x} ", byte);
}
println!();
input_tx.send(buffer[..n].to_vec()).await.unwrap();
}
})
.detach();
(result, host_tx, input_rx)
}
// pub fn write_to_reader(&mut self, data: &[u8]) -> std::io::Result<()> {
// self.reader.write_all(data)?;
// if let Some(poll) = &*self.poller.lock().unwrap() {
// let _ = poll.notify();
// }
// Ok(())
// }
}
impl EventedReadWrite for RemotePty {
// type Reader = VecDeque<u8>;
type Reader = UnixStream;
type Writer = UnixStream;
unsafe fn register(
&mut self,
poll: &std::sync::Arc<Poller>,
mut interest: Event,
mode: PollMode,
) -> std::io::Result<()> {
println!("register");
interest.key = 0; // PTY_READ_WRITE_TOKEN
poll.add_with_mode(&self.reader, interest, mode)?;
// if !self.reader.is_empty() {
// poll.notify()?;
// }
Ok(())
}
fn reregister(
&mut self,
poll: &std::sync::Arc<Poller>,
interest: Event,
mode: PollMode,
) -> std::io::Result<()> {
println!("reregister");
poll.modify_with_mode(&self.reader, interest, mode)?;
// if !self.reader.is_empty() {
// poll.notify()?;
// }
Ok(())
}
fn deregister(&mut self, poll: &std::sync::Arc<Poller>) -> std::io::Result<()> {
println!("deregister");
poll.delete(&self.reader)?;
Ok(())
}
fn reader(&mut self) -> &mut Self::Reader {
println!("reader");
&mut self.reader
}
fn writer(&mut self) -> &mut Self::Writer {
println!("writer");
&mut self.writer
}
}
impl EventedPty for RemotePty {
fn next_child_event(&mut self) -> Option<alacritty_terminal::tty::ChildEvent> {
None
}
}
impl OnResize for RemotePty {
fn on_resize(&mut self, window_size: alacritty_terminal::event::WindowSize) {
// todo!()
println!("resize")
}
}

View File

@@ -1,4 +1,6 @@
pub mod headless;
pub mod mappings;
pub mod pty;
pub use alacritty_terminal;
@@ -6,7 +8,7 @@ mod pty_info;
pub mod terminal_settings;
use alacritty_terminal::{
event::{Event as AlacTermEvent, EventListener, Notify, WindowSize},
event::{Event as AlacTermEvent, EventListener, Notify, OnResize, WindowSize},
event_loop::{EventLoop, Msg, Notifier},
grid::{Dimensions, Scroll as AlacScroll},
index::{Boundary, Column, Direction as AlacDirection, Line, Point as AlacPoint},
@@ -38,7 +40,7 @@ use futures::StreamExt;
use pty_info::PtyProcessInfo;
use serde::{Deserialize, Serialize};
use settings::Settings;
use smol::channel::{Receiver, Sender};
use smol::channel::{bounded, Receiver, Sender};
use task::TaskId;
use terminal_settings::{AlternateScroll, Shell, TerminalBlink, TerminalSettings};
use theme::{ActiveTheme, Theme};
@@ -417,7 +419,100 @@ impl TerminalBuilder {
last_mouse: None,
matches: Vec::new(),
selection_head: None,
pty_info,
pty_info: Some(pty_info),
breadcrumb_text: String::new(),
scroll_px: px(0.),
last_mouse_position: None,
next_link_id: 0,
selection_phase: SelectionPhase::Ended,
secondary_pressed: false,
hovered_word: false,
url_regex,
word_regex,
};
Ok(TerminalBuilder {
terminal,
events_rx,
})
}
#[allow(clippy::too_many_arguments)]
pub fn new_remote<T>(
blink_settings: Option<TerminalBlink>,
alternate_scroll: AlternateScroll,
max_scroll_history_lines: Option<usize>,
pty: T,
) -> Result<TerminalBuilder>
where
T: tty::EventedPty + OnResize + Send + 'static,
{
let scrolling_history = max_scroll_history_lines
.unwrap_or(DEFAULT_SCROLL_HISTORY_LINES)
.min(MAX_SCROLL_HISTORY_LINES);
let config = Config {
scrolling_history,
..Config::default()
};
println!("!!config done");
//Spawn a task so the Alacritty EventLoop can communicate with us in a view context
//TODO: Remove with a bounded sender which can be dispatched on &self
let (events_tx, events_rx) = unbounded();
//Set up the terminal...
let mut term = Term::new(
config,
&TerminalSize::default(),
ZedListener(events_tx.clone()),
);
//Start off blinking if we need to
if let Some(TerminalBlink::On) = blink_settings {
term.set_private_mode(PrivateMode::Named(NamedPrivateMode::BlinkingCursor));
}
//Alacritty defaults to alternate scrolling being on, so we just need to turn it off.
if let AlternateScroll::Off = alternate_scroll {
term.unset_private_mode(PrivateMode::Named(NamedPrivateMode::AlternateScroll));
}
let term = Arc::new(FairMutex::new(term));
println!("!!term done");
// let pty_info = PtyProcessInfo::new(&pty);
//And connect them together
let event_loop = EventLoop::new(
term.clone(),
ZedListener(events_tx.clone()),
pty,
false,
true,
)?;
println!("!!loop done");
//Kick things off
let pty_tx = event_loop.channel();
let _io_thread = event_loop.spawn(); // DANGER
println!("!!spawn done");
let url_regex = RegexSearch::new(r#"(ipfs:|ipns:|magnet:|mailto:|gemini://|gopher://|https://|http://|news:|file://|git://|ssh:|ftp://)[^\u{0000}-\u{001F}\u{007F}-\u{009F}<>"\s{-}\^⟨⟩`]+"#).unwrap();
let word_regex = RegexSearch::new(r#"[\$\+\w.\[\]:/@\-~]+"#).unwrap();
let (completion_tx, _) = bounded(1);
let terminal = Terminal {
task: None,
pty_tx: Notifier(pty_tx),
completion_tx,
term,
events: VecDeque::with_capacity(10), //Should never get this high.
last_content: Default::default(),
last_mouse: None,
matches: Vec::new(),
selection_head: None,
pty_info: None,
breadcrumb_text: String::new(),
scroll_px: px(0.),
last_mouse_position: None,
@@ -438,7 +533,9 @@ impl TerminalBuilder {
pub fn subscribe(mut self, cx: &mut ModelContext<Terminal>) -> Terminal {
//Event loop
cx.spawn(|terminal, mut cx| async move {
println!("subscribed");
while let Some(event) = self.events_rx.next().await {
println!("event_rx");
terminal.update(&mut cx, |terminal, cx| {
//Process the first event immediately for lowered latency
terminal.process_event(&event, cx);
@@ -572,7 +669,7 @@ pub struct Terminal {
pub last_content: TerminalContent,
pub selection_head: Option<AlacPoint>,
pub breadcrumb_text: String,
pub pty_info: PtyProcessInfo,
pub pty_info: Option<PtyProcessInfo>,
scroll_px: Pixels,
next_link_id: usize,
selection_phase: SelectionPhase,
@@ -620,6 +717,7 @@ impl TaskStatus {
impl Terminal {
fn process_event(&mut self, event: &AlacTermEvent, cx: &mut ModelContext<Self>) {
println!("event {event:?}");
match event {
AlacTermEvent::Title(title) => {
self.breadcrumb_text = title.to_string();
@@ -654,8 +752,10 @@ impl Terminal {
AlacTermEvent::Wakeup => {
cx.emit(Event::Wakeup);
if self.pty_info.has_changed() {
cx.emit(Event::TitleChanged);
if let Some(pty_info) = &mut self.pty_info {
if pty_info.has_changed() {
cx.emit(Event::TitleChanged);
}
}
}
AlacTermEvent::ColorRequest(idx, fun_ptr) => {
@@ -673,7 +773,11 @@ impl Terminal {
}
pub fn get_cwd(&self) -> Option<PathBuf> {
self.pty_info.current.as_ref().map(|info| info.cwd.clone())
if let Some(pty_info) = &self.pty_info {
pty_info.current.as_ref().map(|info| info.cwd.clone())
} else {
None
}
}
///Takes events from Alacritty and translates them to behavior on this view
@@ -1372,38 +1476,43 @@ impl Terminal {
task_state.full_label.clone()
}
}
None => self
.pty_info
.current
.as_ref()
.map(|fpi| {
let process_file = fpi
.cwd
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_default();
None => {
if let Some(pty_info) = &self.pty_info {
pty_info
.current
.as_ref()
.map(|fpi| {
let process_file = fpi
.cwd
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_default();
let argv = fpi.argv.clone();
let process_name = format!(
"{}{}",
fpi.name,
if argv.len() >= 1 {
format!(" {}", (argv[1..]).join(" "))
} else {
"".to_string()
}
);
let (process_file, process_name) = if truncate {
(
truncate_and_trailoff(&process_file, MAX_CHARS),
truncate_and_trailoff(&process_name, MAX_CHARS),
)
} else {
(process_file, process_name)
};
format!("{process_file}{process_name}")
})
.unwrap_or_else(|| "Terminal".to_string()),
let argv = fpi.argv.clone();
let process_name = format!(
"{}{}",
fpi.name,
if argv.len() >= 1 {
format!(" {}", (argv[1..]).join(" "))
} else {
"".to_string()
}
);
let (process_file, process_name) = if truncate {
(
truncate_and_trailoff(&process_file, MAX_CHARS),
truncate_and_trailoff(&process_name, MAX_CHARS),
)
} else {
(process_file, process_name)
};
format!("{process_file}{process_name}")
})
.unwrap_or_else(|| "Terminal".to_string())
} else {
"Terminal".to_string()
}
}
}
}

View File

@@ -494,35 +494,46 @@ impl TerminalPanel {
self.pending_terminals_to_add += 1;
cx.spawn(|terminal_panel, mut cx| async move {
let pane = terminal_panel.update(&mut cx, |this, _| this.pane.clone())?;
workspace.update(&mut cx, |workspace, cx| {
let working_directory = if let Some(working_directory) = working_directory {
let working_directory = workspace.update(&mut cx, |workspace, cx| {
if let Some(working_directory) = working_directory {
Some(working_directory)
} else {
let working_directory_strategy =
TerminalSettings::get_global(cx).working_directory.clone();
crate::get_working_directory(workspace, cx, working_directory_strategy)
};
let window = cx.window_handle();
if let Some(terminal) = workspace.project().update(cx, |project, cx| {
project
.create_terminal(working_directory, spawn_task, window, cx)
.log_err()
}) {
let terminal = Box::new(cx.new_view(|cx| {
TerminalView::new(
terminal,
workspace.weak_handle(),
workspace.database_id(),
cx,
)
}));
pane.update(cx, |pane, cx| {
let focus = pane.has_focus(cx);
pane.add_item(terminal, true, focus, None, cx);
});
}
})?;
let window = cx.window_handle();
let terminal = workspace
.update(&mut cx, |workspace, cx| {
workspace.project().update(cx, |project, cx| {
project
.create_terminal(working_directory, spawn_task, window, cx)
.log_err()
})
})?
.await;
if let Some(terminal) = terminal {
let terminal = Box::new(
workspace
.update(&mut cx, |workspace, cx| {
cx.new_view(|cx| {
TerminalView::new(
terminal,
workspace.weak_handle(),
workspace.database_id(),
cx,
)
})
})
.unwrap(),
);
pane.update(&mut cx, |pane, cx| {
let focus = pane.has_focus(cx);
pane.add_item(terminal, true, focus, None, cx);
})
.log_err();
}
terminal_panel.update(&mut cx, |this, cx| {
this.pending_terminals_to_add = this.pending_terminals_to_add.saturating_sub(1);
this.serialize(cx)
@@ -590,14 +601,23 @@ impl TerminalPanel {
.ok()?;
let reveal = spawn_task.reveal;
let window = cx.window_handle();
let new_terminal = project.update(cx, |project, cx| {
project
.create_terminal(working_directory, Some(spawn_task), window, cx)
cx.spawn(|_, mut cx| async move {
let new_terminal = project
.update(&mut cx, |project, cx| {
project.create_terminal(working_directory, Some(spawn_task), window, cx)
})
.unwrap()
.await
.log_err()
})?;
terminal_to_replace.update(cx, |terminal_to_replace, cx| {
terminal_to_replace.set_terminal(new_terminal, cx);
});
.unwrap();
cx.update(|cx| {
terminal_to_replace.update(cx, |terminal_to_replace, cx| {
terminal_to_replace.set_terminal(new_terminal, cx);
});
})
.unwrap();
})
.detach();
match reveal {
RevealStrategy::Always => {

View File

@@ -118,24 +118,27 @@ impl TerminalView {
get_working_directory(workspace, cx, strategy.working_directory.clone());
let window = cx.window_handle();
let terminal = workspace
.project()
.update(cx, |project, cx| {
project.create_terminal(working_directory, None, window, cx)
})
.notify_err(workspace, cx);
let terminal = workspace.project().update(cx, |project, cx| {
project.create_terminal(working_directory, None, window, cx)
});
let workspace_handle = workspace.weak_handle();
let workspace_id = workspace.database_id();
if let Some(terminal) = terminal {
let view = cx.new_view(|cx| {
TerminalView::new(
terminal,
workspace.weak_handle(),
workspace.database_id(),
cx,
)
});
workspace.add_item_to_active_pane(Box::new(view), cx)
}
cx.spawn(|workspace, mut cx| async move {
if let Some(terminal) = terminal.await.notify_async_err(&mut cx) {
let view = cx
.new_view(|cx| TerminalView::new(terminal, workspace_handle, workspace_id, cx))
.notify_async_err(&mut cx);
if let Some(view) = view {
workspace
.update(&mut cx, |workspace, cx| {
workspace.add_item_to_active_pane(Box::new(view), cx);
})
.notify_async_err(&mut cx);
}
}
})
.detach();
}
pub fn new(
@@ -894,9 +897,11 @@ impl Item for TerminalView {
})
.filter(|cwd| !cwd.as_os_str().is_empty());
let terminal = project.update(&mut cx, |project, cx| {
project.create_terminal(cwd, None, window, cx)
})??;
let terminal = project
.update(&mut cx, |project, cx| {
project.create_terminal(cwd, None, window, cx)
})?
.await?;
pane.update(&mut cx, |_, cx| {
cx.new_view(|cx| TerminalView::new(terminal, workspace, workspace_id, cx))
})