Compare commits

...

12 Commits

Author SHA1 Message Date
Conrad Irwin
1ba60d1a8d Undo refactoring that broke things 2024-10-18 16:34:01 -06:00
Conrad Irwin
06f18bc365 Revert "ssh remoting: Undo the spawning of message handlers (#19409)"
This reverts commit ea460014ab.
2024-10-18 16:11:15 -06:00
Conrad Irwin
f5833010aa Merge branch 'main' into ssh-reconnect-reliability 2024-10-18 16:10:25 -06:00
Conrad Irwin
f86f06c81a Revert "Revert "SSH reconnect reliability (#19398)""
This reverts commit 9b0ee7d30b.
2024-10-18 16:05:04 -06:00
Conrad Irwin
9b0ee7d30b Revert "SSH reconnect reliability (#19398)"
This reverts commit 98ecb43b2d.
2024-10-18 15:57:01 -06:00
Conrad Irwin
47a764553d Revert "remote: Fix formatting (#19438)"
This reverts commit 47380001cc.
2024-10-18 15:57:00 -06:00
Conrad Irwin
d7ff85e2a1 Clippity Cloppity 2024-10-17 16:56:11 -06:00
Conrad Irwin
b2b6b1e8a1 Don't rebuild/re-upload binary on reconnect. 2024-10-17 16:19:06 -06:00
Conrad Irwin
023186b7a0 Fix message buffering during reconnect
Co-Authored-By: Nathan <nathan@zed.dev>
2024-10-17 16:07:57 -06:00
Conrad Irwin
e5ec08e8f8 Failing test for edits in parallel with disconnects 2024-10-17 14:29:09 -06:00
Conrad Irwin
2f6c7a2939 Implement the fake connection stuff 2024-10-17 14:10:03 -06:00
Conrad Irwin
2f26a74c83 Refactor Ssh connections to make faking more possible 2024-10-17 12:56:21 -06:00
3 changed files with 27 additions and 22 deletions

View File

@@ -532,7 +532,8 @@ impl SshRemoteClient {
let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?;
let client =
cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "client"))?;
let this = cx.new_model(|_| Self {
client: client.clone(),
unique_identifier: unique_identifier.clone(),
@@ -783,7 +784,7 @@ impl SshRemoteClient {
cx.emit(SshRemoteEvent::Disconnected);
Ok(())
} else {
log::debug!("State has transition from Reconnecting into new state while attempting reconnect. Ignoring new state.");
log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
Ok(())
}
})
@@ -1185,7 +1186,7 @@ impl SshRemoteClient {
let (forwarder, _, _) =
ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx.to_async());
let client = ChannelClient::new(incoming_rx, outgoing_tx, cx);
let client = ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server");
(forwarder, client)
})
}
@@ -1557,6 +1558,7 @@ impl ChannelClient {
incoming_rx: mpsc::UnboundedReceiver<Envelope>,
outgoing_tx: mpsc::UnboundedSender<Envelope>,
cx: &AppContext,
name: &'static str,
) -> Arc<Self> {
let this = Arc::new(Self {
outgoing_tx,
@@ -1567,7 +1569,7 @@ impl ChannelClient {
buffer: Mutex::new(VecDeque::new()),
});
Self::start_handling_messages(this.clone(), incoming_rx, cx);
Self::start_handling_messages(this.clone(), incoming_rx, cx, name);
this
}
@@ -1576,6 +1578,7 @@ impl ChannelClient {
this: Arc<Self>,
mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
cx: &AppContext,
name: &'static str,
) {
cx.spawn(|cx| {
let this = Arc::downgrade(&this);
@@ -1591,17 +1594,16 @@ impl ChannelClient {
buffer.pop_front();
}
}
if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) =
&incoming.payload
{
if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload {
{
let buffer = this.buffer.lock();
for envelope in buffer.iter() {
this.outgoing_tx.unbounded_send(envelope.clone()).ok();
}
}
let response = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
this.send_dynamic(response).ok();
let mut envelope = proto::Ack{}.into_envelope(0, Some(incoming.id), None);
envelope.id = this.next_message_id.fetch_add(1, SeqCst);
this.outgoing_tx.unbounded_send(envelope)?;
continue;
}
@@ -1627,19 +1629,22 @@ impl ChannelClient {
this.clone().into(),
cx.clone(),
) {
log::debug!("ssh message received. name:{type_name}");
match future.await {
Ok(_) => {
log::debug!("ssh message handled. name:{type_name}");
log::debug!("{name}:ssh message received. name:{type_name}");
cx.foreground_executor().spawn(async move {
match future.await {
Ok(_) => {
log::debug!("{name}:ssh message handled. name:{type_name}");
}
Err(error) => {
log::error!(
"{name}:error handling message. type:{type_name}, error:{error}",
);
}
}
Err(error) => {
log::error!(
"error handling message. type:{type_name}, error:{error}",
);
}
}
}).detach();
} else {
log::error!("unhandled ssh message name:{type_name}");
log::error!("{name}:unhandled ssh message name:{type_name}");
}
}
}

View File

@@ -641,7 +641,7 @@ async fn test_open_server_settings(cx: &mut TestAppContext, server_cx: &mut Test
})
}
#[gpui::test(iterations = 10)]
#[gpui::test(iterations = 20)]
async fn test_reconnect(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
let (project, _headless, fs) = init_test(cx, server_cx).await;

View File

@@ -270,7 +270,7 @@ fn start_server(
})
.detach();
ChannelClient::new(incoming_rx, outgoing_tx, cx)
ChannelClient::new(incoming_rx, outgoing_tx, cx, "server")
}
fn init_paths() -> anyhow::Result<()> {