Compare commits

...

1 Commits

Author SHA1 Message Date
Thorsten Ball
3647c2e351 thorsten's log collection 2024-10-18 19:05:53 +02:00
5 changed files with 165 additions and 36 deletions

1
Cargo.lock generated
View File

@@ -9151,7 +9151,6 @@ dependencies = [
"fs",
"futures 0.3.30",
"git",
"git_hosting_providers",
"gpui",
"http_client",
"language",

View File

@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Result};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use prost::Message as _;
use rpc::proto::Envelope;
@@ -14,14 +14,25 @@ pub fn message_len_from_buffer(buffer: &[u8]) -> MessageLen {
MessageLen::from_le_bytes(buffer.try_into().unwrap())
}
pub fn write_to_global_log(message: &str) {
use std::io::Write;
let mut file =
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open("/Users/thorstenball/mylog.log".to_string())
.expect("Failed to open log file");
writeln!(file, "{}", message).expect("Failed to write to log file");
}
pub async fn read_message_with_len<S: AsyncRead + Unpin>(
stream: &mut S,
buffer: &mut Vec<u8>,
message_len: MessageLen,
) -> Result<Envelope> {
buffer.resize(message_len as usize, 0);
stream.read_exact(buffer).await?;
Ok(Envelope::decode(buffer.as_slice())?)
stream.read_exact(buffer).await.context("read exact failed")?;
Ok(Envelope::decode(buffer.as_slice()).context("decode failed")?)
}
pub async fn read_message<S: AsyncRead + Unpin>(
@@ -29,9 +40,13 @@ pub async fn read_message<S: AsyncRead + Unpin>(
buffer: &mut Vec<u8>,
) -> Result<Envelope> {
buffer.resize(MESSAGE_LEN_SIZE, 0);
stream.read_exact(buffer).await?;
stream.read_exact(buffer).await.context("read exact failed")?;
// log::debug!("read_exact DONE");
let len = message_len_from_buffer(buffer);
read_message_with_len(stream, buffer, len).await
// log::debug!("received message_len_from_buffer: {}", len);
let s = read_message_with_len(stream, buffer, len).await.with_context(|| format!("read message with len={} failed", len))?;
// log::debug!("received message with len: {}", len);
Ok(s)
}
pub async fn write_message<S: AsyncWrite + Unpin>(
@@ -50,6 +65,23 @@ pub async fn write_message<S: AsyncWrite + Unpin>(
Ok(())
}
pub async fn write_message_log<S: AsyncWrite + Unpin>(
stream: &mut S,
buffer: &mut Vec<u8>,
message: Envelope,
) -> Result<()> {
let message_len = message.encoded_len() as u32;
log::debug!("write_message_log. message_len: {}", message_len);
stream
.write_all(message_len.to_le_bytes().as_slice())
.await.context("failed to write message_len")?;
buffer.clear();
buffer.reserve(message_len as usize);
message.encode(buffer).context("Failed to encode message")?;
stream.write_all(buffer).await?;
Ok(())
}
pub async fn read_message_raw<S: AsyncRead + Unpin>(
stream: &mut S,
buffer: &mut Vec<u8>,
@@ -58,6 +90,7 @@ pub async fn read_message_raw<S: AsyncRead + Unpin>(
stream.read_exact(buffer).await?;
let message_len = message_len_from_buffer(buffer);
log::debug!("read_message_raw. message_len: {}", message_len);
buffer.resize(message_len as usize, 0);
stream.read_exact(buffer).await?;

View File

@@ -1,7 +1,7 @@
use crate::{
json_log::LogRecord,
protocol::{
message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
message_len_from_buffer, read_message_with_len, write_message_log, MessageId, MESSAGE_LEN_SIZE,
},
proxy::ProxyLaunchError,
};
@@ -11,10 +11,7 @@ use futures::{
channel::{
mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
oneshot,
},
future::BoxFuture,
select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt,
StreamExt as _,
}, future::BoxFuture, select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt, StreamExt as _
};
use gpui::{
AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task,
@@ -296,13 +293,17 @@ impl ChannelForwarder {
loop {
select_biased! {
_ = quit_rx.next().fuse() => {
log::debug!("ChannelForwarder. quit");
break;
},
incoming_envelope = proxy_incoming_rx.next().fuse() => {
// log::debug!("ChannelForwarder. incoming envelope.");
if let Some(envelope) = incoming_envelope {
// let id = envelope.id;
if incoming_tx.send(envelope).await.is_err() {
break;
}
// log::debug!("ChannelForwarder. incoming envelope sent. id: {:?}", id);
} else {
break;
}
@@ -310,8 +311,10 @@ impl ChannelForwarder {
outgoing_envelope = outgoing_rx.next().fuse() => {
if let Some(envelope) = outgoing_envelope {
if proxy_outgoing_tx.send(envelope).await.is_err() {
// log::debug!("ChannelForwarder. outgoing envelope errored.");
break;
}
// log::debug!("ChannelForwarder. outgoing envelope sent.");
} else {
break;
}
@@ -622,6 +625,7 @@ impl SshRemoteClient {
}
fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
log::debug!("reconnect...");
let mut lock = self.state.lock();
let can_reconnect = lock
@@ -638,6 +642,7 @@ impl SshRemoteClient {
return Err(anyhow!(error));
}
log::debug!("reconnect... taking state.");
let state = lock.take().unwrap();
let (attempts, mut ssh_connection, delegate, forwarder) = match state {
State::Connected {
@@ -836,6 +841,7 @@ impl SshRemoteClient {
keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
if missed_heartbeats != 0 {
log::debug!("Resetting missed heartbeats to 0");
missed_heartbeats = 0;
this.update(&mut cx, |this, mut cx| {
this.handle_heartbeat_result(missed_heartbeats, &mut cx)
@@ -863,6 +869,7 @@ impl SshRemoteClient {
MAX_MISSED_HEARTBEATS
);
} else if missed_heartbeats != 0 {
log::debug!("Resetting missed heartbeats to 0");
missed_heartbeats = 0;
} else {
continue;
@@ -938,18 +945,24 @@ impl SshRemoteClient {
return anyhow::Ok(None);
};
write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
// {"level":2,"module_path":"remote_server::unix","file":"crates/remote_server/src/unix.rs","line":281,"message":"(remote server) error reading message on stdin: read message with len=3793265416 failed."}
write_message_log(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
child_stdin.flush().await?;
}
result = child_stdout.read(&mut stdout_buffer).fuse() => {
match result {
Ok(0) => {
log::debug!("child_stdout. 0. closing channels");
child_stdin.close().await?;
outgoing_rx.close();
// log::debug!("child_stdout. 0. getting status");
let status = ssh_proxy_process.status().await?;
// log::debug!("child_stdout. 0. got status");
// If we don't have a code, we assume process
// has been killed and treat it as non-zero exit
// code
log::debug!("child_stdout. 0. returning");
return Ok(status.code().or_else(|| Some(1)));
}
Ok(len) => {
@@ -977,6 +990,7 @@ impl SshRemoteClient {
result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
match result {
Ok(len) => {
// log::debug!("child_stderr. len: {}", len);
stderr_offset += len;
let mut start_ix = 0;
while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
@@ -1006,8 +1020,11 @@ impl SshRemoteClient {
cx.spawn(|mut cx| async move {
let result = io_task.await;
log::debug!("IO Task finished. result: {:?}", result);
match result {
Ok(Some(exit_code)) => {
log::debug!("Proxy exited with code {:?}", exit_code);
if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
match error {
ProxyLaunchError::ServerNotRunning => {
@@ -1046,6 +1063,7 @@ impl SshRemoteClient {
cx: &mut ModelContext<Self>,
map: impl FnOnce(&State) -> Option<State>,
) {
log::debug!("try_set_state ...");
let mut lock = self.state.lock();
let new_state = lock.as_ref().and_then(map);
@@ -1053,6 +1071,7 @@ impl SshRemoteClient {
lock.replace(new_state);
cx.notify();
}
log::debug!("try_set_state done");
}
fn set_state(&self, state: State, cx: &mut ModelContext<Self>) {
@@ -1275,8 +1294,8 @@ impl SshRemoteConnection {
.args(connection_options.additional_args().unwrap_or(&Vec::new()))
.args([
"-N",
"-o",
"ControlPersist=no",
// "-o",
// "ControlPersist=no",
"-o",
"ControlMaster=yes",
"-o",
@@ -1519,20 +1538,21 @@ impl ChannelClient {
build_typed_envelope(peer_id, Instant::now(), incoming)
{
let type_name = envelope.payload_type_name();
let id = envelope.message_id();
if let Some(future) = ProtoMessageHandlerSet::handle_message(
&this.message_handlers,
envelope,
this.clone().into(),
cx.clone(),
) {
log::debug!("ssh message received. name:{type_name}");
log::debug!("ssh message received. name:{type_name} id:{id}");
match future.await {
Ok(_) => {
log::debug!("ssh message handled. name:{type_name}");
log::debug!("ssh message handled. name:{type_name} id:{id}");
}
Err(error) => {
log::error!(
"error handling message. type:{type_name}, error:{error}",
"error handling message. type:{type_name}, error:{error} id:{id}",
);
}
}
@@ -1573,7 +1593,15 @@ impl ChannelClient {
log::debug!("ssh request start. name:{}", T::NAME);
let response = self.request_dynamic(payload.into_envelope(0, None, None), T::NAME);
async move {
let response = response.await?;
let response = futures::select! {
response = response.fuse() => response?,
_ = async {
loop {
smol::Timer::after(Duration::from_secs(3)).await;
log::debug!("Still waiting for response. name:{}", T::NAME);
}
}.fuse() => unreachable!(),
};
log::debug!("ssh request finish. name:{}", T::NAME);
T::Response::from_envelope(response)
.ok_or_else(|| anyhow!("received a response of the wrong type"))
@@ -1596,15 +1624,16 @@ impl ChannelClient {
pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
log::debug!("ssh send name:{}", T::NAME);
self.send_dynamic(payload.into_envelope(0, None, None))
self.send_dynamic(payload.into_envelope(0, None, None), T::NAME)
}
pub fn request_dynamic(
fn request_dynamic(
&self,
mut envelope: proto::Envelope,
type_name: &'static str,
) -> impl 'static + Future<Output = Result<proto::Envelope>> {
envelope.id = self.next_message_id.fetch_add(1, SeqCst);
log::debug!("ssh send_dynamic name:{} id:{}", type_name, envelope.id);
let (tx, rx) = oneshot::channel();
let mut response_channels_lock = self.response_channels.lock();
response_channels_lock.insert(MessageId(envelope.id), tx);
@@ -1624,8 +1653,9 @@ impl ChannelClient {
}
}
pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
fn send_dynamic(&self, mut envelope: proto::Envelope, message_type: &'static str) -> Result<()> {
envelope.id = self.next_message_id.fetch_add(1, SeqCst);
log::debug!("ssh send name:{} id:{}", message_type, envelope.id);
self.outgoing_tx.unbounded_send(envelope)?;
Ok(())
}
@@ -1640,12 +1670,12 @@ impl ProtoClient for ChannelClient {
self.request_dynamic(envelope, request_type).boxed()
}
fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
self.send_dynamic(envelope)
fn send(&self, envelope: proto::Envelope, message_type: &'static str) -> Result<()> {
self.send_dynamic(envelope, message_type)
}
fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
self.send_dynamic(envelope)
fn send_response(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()> {
self.send_dynamic(envelope, message_type)
}
fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {

View File

@@ -31,7 +31,7 @@ env_logger.workspace = true
fs.workspace = true
futures.workspace = true
git.workspace = true
git_hosting_providers.workspace = true
# git_hosting_providers.workspace = true
gpui.workspace = true
http_client.workspace = true
language.workspace = true

View File

@@ -5,7 +5,7 @@ use client::ProxySettings;
use fs::{Fs, RealFs};
use futures::channel::mpsc;
use futures::{select, select_biased, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt};
use git::GitHostingProviderRegistry;
// use git::GitHostingProviderRegistry;
use gpui::{AppContext, Context as _, ModelContext, UpdateGlobal as _};
use http_client::{read_proxy_from_env, Uri};
use language::LanguageRegistry;
@@ -34,8 +34,61 @@ use std::{
};
use util::ResultExt;
fn init_logging_proxy() {
env_logger::builder()
// fn init_logging_proxy() {
// env_logger::builder()
// .format(|buf, record| {
// let mut log_record = LogRecord::new(record);
// log_record.message = format!("(remote proxy) {}", log_record.message);
// serde_json::to_writer(&mut *buf, &log_record)?;
// buf.write_all(b"\n")?;
// Ok(())
// })
// .init();
// }
fn init_logging_proxy(log_file_path: PathBuf) -> Result<Receiver<Vec<u8>>> {
struct MultiWrite {
file: Box<dyn std::io::Write + Send + 'static>,
channel: Sender<Vec<u8>>,
buffer: Vec<u8>,
}
impl std::io::Write for MultiWrite {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let written = self.file.write(buf)?;
self.buffer.extend_from_slice(&buf[..written]);
Ok(written)
}
fn flush(&mut self) -> std::io::Result<()> {
self.channel
.send_blocking(self.buffer.clone())
.map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error))?;
self.buffer.clear();
self.file.flush()
}
}
let log_file = Box::new(if log_file_path.exists() {
std::fs::OpenOptions::new()
.append(true)
.open(&log_file_path)
.context("Failed to open log file in append mode")?
} else {
std::fs::File::create(&log_file_path).context("Failed to create log file")?
});
let (tx, rx) = smol::channel::unbounded();
let target = Box::new(MultiWrite {
file: log_file,
channel: tx,
buffer: Vec::new(),
});
env_logger::Builder::from_default_env()
.target(env_logger::Target::Pipe(target))
.format(|buf, record| {
let mut log_record = LogRecord::new(record);
log_record.message = format!("(remote proxy) {}", log_record.message);
@@ -44,6 +97,8 @@ fn init_logging_proxy() {
Ok(())
})
.init();
Ok(rx)
}
fn init_logging_server(log_file_path: PathBuf) -> Result<Receiver<Vec<u8>>> {
@@ -223,14 +278,18 @@ fn start_server(
let message = match stdin_message {
Ok(message) => message,
Err(error) => {
log::warn!("error reading message on stdin: {}. exiting.", error);
log::warn!("error reading message on stdin: {}.", error);
// std::process::exit(1);
break;
}
};
log::debug!("got message on stdin. forwarding to application.");
if let Err(error) = incoming_tx.unbounded_send(message) {
log::error!("failed to send message to application: {:?}. exiting.", error);
// std::process::exit(1);
return Err(anyhow!(error));
}
// log::debug!("forwarded message to application.");
}
outgoing_message = outgoing_rx.next().fuse() => {
@@ -253,14 +312,18 @@ fn start_server(
log_message = log_rx.next().fuse() => {
if let Some(log_message) = log_message {
// write_to_global_log("writing log message...");
if let Err(error) = stderr_stream.write_all(&log_message).await {
log::error!("failed to write log message to stderr: {:?}", error);
// write_to_global_log(format!("writing log message to stderr failed: {:?}", error).as_str());
// log::error!("failed to write log message to stderr: {:?}", error);
break;
}
if let Err(error) = stderr_stream.flush().await {
log::error!("failed to flush stderr stream: {:?}", error);
// write_to_global_log(&format!("failed to flush stderr stream: {:?}", error));
// log::error!("failed to flush stderr stream: {:?}", error);
break;
}
// write_to_global_log("writing log message... DONE");
}
}
}
@@ -315,7 +378,7 @@ pub fn execute_run(
log::info!("starting headless gpui app");
let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
// let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
gpui::App::headless().run(move |cx| {
settings::init(cx);
HeadlessProject::init(cx);
@@ -325,8 +388,8 @@ pub fn execute_run(
client::init_settings(cx);
GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
git_hosting_providers::init(cx);
// GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
// git_hosting_providers::init(cx);
let project = cx.new_model(|cx| {
let fs = Arc::new(RealFs::new(Default::default(), None));
@@ -403,7 +466,8 @@ impl ServerPaths {
}
pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
init_logging_proxy();
let log_file = paths::remote_server_state_dir().join(&identifier).join("proxy.log");
init_logging_proxy(log_file).log_err();
init_panic_hook();
log::info!("starting proxy process. PID: {}", std::process::id());
@@ -443,6 +507,7 @@ pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
let mut stream = smol::net::unix::UnixStream::connect(&server_paths.stderr_socket).await?;
let mut stderr_buffer = vec![0; 2048];
loop {
log::debug!("proxy reading message from stderr socket");
match stream.read(&mut stderr_buffer).await {
Ok(0) => {
let error =
@@ -598,6 +663,7 @@ where
let mut buffer = Vec::new();
loop {
log::debug!("proxy reading message from {}", socket_name);
read_message_raw(&mut reader, &mut buffer)
.await
.with_context(|| format!("failed to read message from {}", socket_name))?;
@@ -617,6 +683,7 @@ async fn write_size_prefixed_buffer<S: AsyncWrite + Unpin>(
buffer: &mut Vec<u8>,
) -> Result<()> {
let len = buffer.len() as u32;
log::debug!("write_size_prefixed_buffer. writing len: {}", len);
stream.write_all(len.to_le_bytes().as_slice()).await?;
stream.write_all(buffer).await?;
Ok(())