Compare commits
1 Commits
v0.202.8
...
thorsten-g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3647c2e351 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -9151,7 +9151,6 @@ dependencies = [
|
||||
"fs",
|
||||
"futures 0.3.30",
|
||||
"git",
|
||||
"git_hosting_providers",
|
||||
"gpui",
|
||||
"http_client",
|
||||
"language",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context, Result};
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use prost::Message as _;
|
||||
use rpc::proto::Envelope;
|
||||
@@ -14,14 +14,25 @@ pub fn message_len_from_buffer(buffer: &[u8]) -> MessageLen {
|
||||
MessageLen::from_le_bytes(buffer.try_into().unwrap())
|
||||
}
|
||||
|
||||
pub fn write_to_global_log(message: &str) {
|
||||
use std::io::Write;
|
||||
let mut file =
|
||||
std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open("/Users/thorstenball/mylog.log".to_string())
|
||||
.expect("Failed to open log file");
|
||||
writeln!(file, "{}", message).expect("Failed to write to log file");
|
||||
}
|
||||
|
||||
pub async fn read_message_with_len<S: AsyncRead + Unpin>(
|
||||
stream: &mut S,
|
||||
buffer: &mut Vec<u8>,
|
||||
message_len: MessageLen,
|
||||
) -> Result<Envelope> {
|
||||
buffer.resize(message_len as usize, 0);
|
||||
stream.read_exact(buffer).await?;
|
||||
Ok(Envelope::decode(buffer.as_slice())?)
|
||||
stream.read_exact(buffer).await.context("read exact failed")?;
|
||||
Ok(Envelope::decode(buffer.as_slice()).context("decode failed")?)
|
||||
}
|
||||
|
||||
pub async fn read_message<S: AsyncRead + Unpin>(
|
||||
@@ -29,9 +40,13 @@ pub async fn read_message<S: AsyncRead + Unpin>(
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<Envelope> {
|
||||
buffer.resize(MESSAGE_LEN_SIZE, 0);
|
||||
stream.read_exact(buffer).await?;
|
||||
stream.read_exact(buffer).await.context("read exact failed")?;
|
||||
// log::debug!("read_exact DONE");
|
||||
let len = message_len_from_buffer(buffer);
|
||||
read_message_with_len(stream, buffer, len).await
|
||||
// log::debug!("received message_len_from_buffer: {}", len);
|
||||
let s = read_message_with_len(stream, buffer, len).await.with_context(|| format!("read message with len={} failed", len))?;
|
||||
// log::debug!("received message with len: {}", len);
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
pub async fn write_message<S: AsyncWrite + Unpin>(
|
||||
@@ -50,6 +65,23 @@ pub async fn write_message<S: AsyncWrite + Unpin>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn write_message_log<S: AsyncWrite + Unpin>(
|
||||
stream: &mut S,
|
||||
buffer: &mut Vec<u8>,
|
||||
message: Envelope,
|
||||
) -> Result<()> {
|
||||
let message_len = message.encoded_len() as u32;
|
||||
log::debug!("write_message_log. message_len: {}", message_len);
|
||||
stream
|
||||
.write_all(message_len.to_le_bytes().as_slice())
|
||||
.await.context("failed to write message_len")?;
|
||||
buffer.clear();
|
||||
buffer.reserve(message_len as usize);
|
||||
message.encode(buffer).context("Failed to encode message")?;
|
||||
stream.write_all(buffer).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn read_message_raw<S: AsyncRead + Unpin>(
|
||||
stream: &mut S,
|
||||
buffer: &mut Vec<u8>,
|
||||
@@ -58,6 +90,7 @@ pub async fn read_message_raw<S: AsyncRead + Unpin>(
|
||||
stream.read_exact(buffer).await?;
|
||||
|
||||
let message_len = message_len_from_buffer(buffer);
|
||||
log::debug!("read_message_raw. message_len: {}", message_len);
|
||||
buffer.resize(message_len as usize, 0);
|
||||
stream.read_exact(buffer).await?;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
json_log::LogRecord,
|
||||
protocol::{
|
||||
message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
|
||||
message_len_from_buffer, read_message_with_len, write_message_log, MessageId, MESSAGE_LEN_SIZE,
|
||||
},
|
||||
proxy::ProxyLaunchError,
|
||||
};
|
||||
@@ -11,10 +11,7 @@ use futures::{
|
||||
channel::{
|
||||
mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
|
||||
oneshot,
|
||||
},
|
||||
future::BoxFuture,
|
||||
select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt,
|
||||
StreamExt as _,
|
||||
}, future::BoxFuture, select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt, StreamExt as _
|
||||
};
|
||||
use gpui::{
|
||||
AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task,
|
||||
@@ -296,13 +293,17 @@ impl ChannelForwarder {
|
||||
loop {
|
||||
select_biased! {
|
||||
_ = quit_rx.next().fuse() => {
|
||||
log::debug!("ChannelForwarder. quit");
|
||||
break;
|
||||
},
|
||||
incoming_envelope = proxy_incoming_rx.next().fuse() => {
|
||||
// log::debug!("ChannelForwarder. incoming envelope.");
|
||||
if let Some(envelope) = incoming_envelope {
|
||||
// let id = envelope.id;
|
||||
if incoming_tx.send(envelope).await.is_err() {
|
||||
break;
|
||||
}
|
||||
// log::debug!("ChannelForwarder. incoming envelope sent. id: {:?}", id);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -310,8 +311,10 @@ impl ChannelForwarder {
|
||||
outgoing_envelope = outgoing_rx.next().fuse() => {
|
||||
if let Some(envelope) = outgoing_envelope {
|
||||
if proxy_outgoing_tx.send(envelope).await.is_err() {
|
||||
// log::debug!("ChannelForwarder. outgoing envelope errored.");
|
||||
break;
|
||||
}
|
||||
// log::debug!("ChannelForwarder. outgoing envelope sent.");
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -622,6 +625,7 @@ impl SshRemoteClient {
|
||||
}
|
||||
|
||||
fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
|
||||
log::debug!("reconnect...");
|
||||
let mut lock = self.state.lock();
|
||||
|
||||
let can_reconnect = lock
|
||||
@@ -638,6 +642,7 @@ impl SshRemoteClient {
|
||||
return Err(anyhow!(error));
|
||||
}
|
||||
|
||||
log::debug!("reconnect... taking state.");
|
||||
let state = lock.take().unwrap();
|
||||
let (attempts, mut ssh_connection, delegate, forwarder) = match state {
|
||||
State::Connected {
|
||||
@@ -836,6 +841,7 @@ impl SshRemoteClient {
|
||||
keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
|
||||
|
||||
if missed_heartbeats != 0 {
|
||||
log::debug!("Resetting missed heartbeats to 0");
|
||||
missed_heartbeats = 0;
|
||||
this.update(&mut cx, |this, mut cx| {
|
||||
this.handle_heartbeat_result(missed_heartbeats, &mut cx)
|
||||
@@ -863,6 +869,7 @@ impl SshRemoteClient {
|
||||
MAX_MISSED_HEARTBEATS
|
||||
);
|
||||
} else if missed_heartbeats != 0 {
|
||||
log::debug!("Resetting missed heartbeats to 0");
|
||||
missed_heartbeats = 0;
|
||||
} else {
|
||||
continue;
|
||||
@@ -938,18 +945,24 @@ impl SshRemoteClient {
|
||||
return anyhow::Ok(None);
|
||||
};
|
||||
|
||||
write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
|
||||
// {"level":2,"module_path":"remote_server::unix","file":"crates/remote_server/src/unix.rs","line":281,"message":"(remote server) error reading message on stdin: read message with len=3793265416 failed."}
|
||||
write_message_log(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
|
||||
child_stdin.flush().await?;
|
||||
}
|
||||
|
||||
result = child_stdout.read(&mut stdout_buffer).fuse() => {
|
||||
match result {
|
||||
Ok(0) => {
|
||||
log::debug!("child_stdout. 0. closing channels");
|
||||
child_stdin.close().await?;
|
||||
outgoing_rx.close();
|
||||
// log::debug!("child_stdout. 0. getting status");
|
||||
let status = ssh_proxy_process.status().await?;
|
||||
// log::debug!("child_stdout. 0. got status");
|
||||
// If we don't have a code, we assume process
|
||||
// has been killed and treat it as non-zero exit
|
||||
// code
|
||||
log::debug!("child_stdout. 0. returning");
|
||||
return Ok(status.code().or_else(|| Some(1)));
|
||||
}
|
||||
Ok(len) => {
|
||||
@@ -977,6 +990,7 @@ impl SshRemoteClient {
|
||||
result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
|
||||
match result {
|
||||
Ok(len) => {
|
||||
// log::debug!("child_stderr. len: {}", len);
|
||||
stderr_offset += len;
|
||||
let mut start_ix = 0;
|
||||
while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
|
||||
@@ -1006,8 +1020,11 @@ impl SshRemoteClient {
|
||||
cx.spawn(|mut cx| async move {
|
||||
let result = io_task.await;
|
||||
|
||||
log::debug!("IO Task finished. result: {:?}", result);
|
||||
|
||||
match result {
|
||||
Ok(Some(exit_code)) => {
|
||||
log::debug!("Proxy exited with code {:?}", exit_code);
|
||||
if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
|
||||
match error {
|
||||
ProxyLaunchError::ServerNotRunning => {
|
||||
@@ -1046,6 +1063,7 @@ impl SshRemoteClient {
|
||||
cx: &mut ModelContext<Self>,
|
||||
map: impl FnOnce(&State) -> Option<State>,
|
||||
) {
|
||||
log::debug!("try_set_state ...");
|
||||
let mut lock = self.state.lock();
|
||||
let new_state = lock.as_ref().and_then(map);
|
||||
|
||||
@@ -1053,6 +1071,7 @@ impl SshRemoteClient {
|
||||
lock.replace(new_state);
|
||||
cx.notify();
|
||||
}
|
||||
log::debug!("try_set_state done");
|
||||
}
|
||||
|
||||
fn set_state(&self, state: State, cx: &mut ModelContext<Self>) {
|
||||
@@ -1275,8 +1294,8 @@ impl SshRemoteConnection {
|
||||
.args(connection_options.additional_args().unwrap_or(&Vec::new()))
|
||||
.args([
|
||||
"-N",
|
||||
"-o",
|
||||
"ControlPersist=no",
|
||||
// "-o",
|
||||
// "ControlPersist=no",
|
||||
"-o",
|
||||
"ControlMaster=yes",
|
||||
"-o",
|
||||
@@ -1519,20 +1538,21 @@ impl ChannelClient {
|
||||
build_typed_envelope(peer_id, Instant::now(), incoming)
|
||||
{
|
||||
let type_name = envelope.payload_type_name();
|
||||
let id = envelope.message_id();
|
||||
if let Some(future) = ProtoMessageHandlerSet::handle_message(
|
||||
&this.message_handlers,
|
||||
envelope,
|
||||
this.clone().into(),
|
||||
cx.clone(),
|
||||
) {
|
||||
log::debug!("ssh message received. name:{type_name}");
|
||||
log::debug!("ssh message received. name:{type_name} id:{id}");
|
||||
match future.await {
|
||||
Ok(_) => {
|
||||
log::debug!("ssh message handled. name:{type_name}");
|
||||
log::debug!("ssh message handled. name:{type_name} id:{id}");
|
||||
}
|
||||
Err(error) => {
|
||||
log::error!(
|
||||
"error handling message. type:{type_name}, error:{error}",
|
||||
"error handling message. type:{type_name}, error:{error} id:{id}",
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1573,7 +1593,15 @@ impl ChannelClient {
|
||||
log::debug!("ssh request start. name:{}", T::NAME);
|
||||
let response = self.request_dynamic(payload.into_envelope(0, None, None), T::NAME);
|
||||
async move {
|
||||
let response = response.await?;
|
||||
let response = futures::select! {
|
||||
response = response.fuse() => response?,
|
||||
_ = async {
|
||||
loop {
|
||||
smol::Timer::after(Duration::from_secs(3)).await;
|
||||
log::debug!("Still waiting for response. name:{}", T::NAME);
|
||||
}
|
||||
}.fuse() => unreachable!(),
|
||||
};
|
||||
log::debug!("ssh request finish. name:{}", T::NAME);
|
||||
T::Response::from_envelope(response)
|
||||
.ok_or_else(|| anyhow!("received a response of the wrong type"))
|
||||
@@ -1596,15 +1624,16 @@ impl ChannelClient {
|
||||
|
||||
pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
|
||||
log::debug!("ssh send name:{}", T::NAME);
|
||||
self.send_dynamic(payload.into_envelope(0, None, None))
|
||||
self.send_dynamic(payload.into_envelope(0, None, None), T::NAME)
|
||||
}
|
||||
|
||||
pub fn request_dynamic(
|
||||
fn request_dynamic(
|
||||
&self,
|
||||
mut envelope: proto::Envelope,
|
||||
type_name: &'static str,
|
||||
) -> impl 'static + Future<Output = Result<proto::Envelope>> {
|
||||
envelope.id = self.next_message_id.fetch_add(1, SeqCst);
|
||||
log::debug!("ssh send_dynamic name:{} id:{}", type_name, envelope.id);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let mut response_channels_lock = self.response_channels.lock();
|
||||
response_channels_lock.insert(MessageId(envelope.id), tx);
|
||||
@@ -1624,8 +1653,9 @@ impl ChannelClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
|
||||
fn send_dynamic(&self, mut envelope: proto::Envelope, message_type: &'static str) -> Result<()> {
|
||||
envelope.id = self.next_message_id.fetch_add(1, SeqCst);
|
||||
log::debug!("ssh send name:{} id:{}", message_type, envelope.id);
|
||||
self.outgoing_tx.unbounded_send(envelope)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1640,12 +1670,12 @@ impl ProtoClient for ChannelClient {
|
||||
self.request_dynamic(envelope, request_type).boxed()
|
||||
}
|
||||
|
||||
fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
|
||||
self.send_dynamic(envelope)
|
||||
fn send(&self, envelope: proto::Envelope, message_type: &'static str) -> Result<()> {
|
||||
self.send_dynamic(envelope, message_type)
|
||||
}
|
||||
|
||||
fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
|
||||
self.send_dynamic(envelope)
|
||||
fn send_response(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()> {
|
||||
self.send_dynamic(envelope, message_type)
|
||||
}
|
||||
|
||||
fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
|
||||
|
||||
@@ -31,7 +31,7 @@ env_logger.workspace = true
|
||||
fs.workspace = true
|
||||
futures.workspace = true
|
||||
git.workspace = true
|
||||
git_hosting_providers.workspace = true
|
||||
# git_hosting_providers.workspace = true
|
||||
gpui.workspace = true
|
||||
http_client.workspace = true
|
||||
language.workspace = true
|
||||
|
||||
@@ -5,7 +5,7 @@ use client::ProxySettings;
|
||||
use fs::{Fs, RealFs};
|
||||
use futures::channel::mpsc;
|
||||
use futures::{select, select_biased, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt};
|
||||
use git::GitHostingProviderRegistry;
|
||||
// use git::GitHostingProviderRegistry;
|
||||
use gpui::{AppContext, Context as _, ModelContext, UpdateGlobal as _};
|
||||
use http_client::{read_proxy_from_env, Uri};
|
||||
use language::LanguageRegistry;
|
||||
@@ -34,8 +34,61 @@ use std::{
|
||||
};
|
||||
use util::ResultExt;
|
||||
|
||||
fn init_logging_proxy() {
|
||||
env_logger::builder()
|
||||
|
||||
// fn init_logging_proxy() {
|
||||
// env_logger::builder()
|
||||
// .format(|buf, record| {
|
||||
// let mut log_record = LogRecord::new(record);
|
||||
// log_record.message = format!("(remote proxy) {}", log_record.message);
|
||||
// serde_json::to_writer(&mut *buf, &log_record)?;
|
||||
// buf.write_all(b"\n")?;
|
||||
// Ok(())
|
||||
// })
|
||||
// .init();
|
||||
// }
|
||||
|
||||
fn init_logging_proxy(log_file_path: PathBuf) -> Result<Receiver<Vec<u8>>> {
|
||||
struct MultiWrite {
|
||||
file: Box<dyn std::io::Write + Send + 'static>,
|
||||
channel: Sender<Vec<u8>>,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl std::io::Write for MultiWrite {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
let written = self.file.write(buf)?;
|
||||
self.buffer.extend_from_slice(&buf[..written]);
|
||||
Ok(written)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.channel
|
||||
.send_blocking(self.buffer.clone())
|
||||
.map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error))?;
|
||||
self.buffer.clear();
|
||||
self.file.flush()
|
||||
}
|
||||
}
|
||||
|
||||
let log_file = Box::new(if log_file_path.exists() {
|
||||
std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&log_file_path)
|
||||
.context("Failed to open log file in append mode")?
|
||||
} else {
|
||||
std::fs::File::create(&log_file_path).context("Failed to create log file")?
|
||||
});
|
||||
|
||||
let (tx, rx) = smol::channel::unbounded();
|
||||
|
||||
let target = Box::new(MultiWrite {
|
||||
file: log_file,
|
||||
channel: tx,
|
||||
buffer: Vec::new(),
|
||||
});
|
||||
|
||||
env_logger::Builder::from_default_env()
|
||||
.target(env_logger::Target::Pipe(target))
|
||||
.format(|buf, record| {
|
||||
let mut log_record = LogRecord::new(record);
|
||||
log_record.message = format!("(remote proxy) {}", log_record.message);
|
||||
@@ -44,6 +97,8 @@ fn init_logging_proxy() {
|
||||
Ok(())
|
||||
})
|
||||
.init();
|
||||
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
fn init_logging_server(log_file_path: PathBuf) -> Result<Receiver<Vec<u8>>> {
|
||||
@@ -223,14 +278,18 @@ fn start_server(
|
||||
let message = match stdin_message {
|
||||
Ok(message) => message,
|
||||
Err(error) => {
|
||||
log::warn!("error reading message on stdin: {}. exiting.", error);
|
||||
log::warn!("error reading message on stdin: {}.", error);
|
||||
// std::process::exit(1);
|
||||
break;
|
||||
}
|
||||
};
|
||||
log::debug!("got message on stdin. forwarding to application.");
|
||||
if let Err(error) = incoming_tx.unbounded_send(message) {
|
||||
log::error!("failed to send message to application: {:?}. exiting.", error);
|
||||
// std::process::exit(1);
|
||||
return Err(anyhow!(error));
|
||||
}
|
||||
// log::debug!("forwarded message to application.");
|
||||
}
|
||||
|
||||
outgoing_message = outgoing_rx.next().fuse() => {
|
||||
@@ -253,14 +312,18 @@ fn start_server(
|
||||
|
||||
log_message = log_rx.next().fuse() => {
|
||||
if let Some(log_message) = log_message {
|
||||
// write_to_global_log("writing log message...");
|
||||
if let Err(error) = stderr_stream.write_all(&log_message).await {
|
||||
log::error!("failed to write log message to stderr: {:?}", error);
|
||||
// write_to_global_log(format!("writing log message to stderr failed: {:?}", error).as_str());
|
||||
// log::error!("failed to write log message to stderr: {:?}", error);
|
||||
break;
|
||||
}
|
||||
if let Err(error) = stderr_stream.flush().await {
|
||||
log::error!("failed to flush stderr stream: {:?}", error);
|
||||
// write_to_global_log(&format!("failed to flush stderr stream: {:?}", error));
|
||||
// log::error!("failed to flush stderr stream: {:?}", error);
|
||||
break;
|
||||
}
|
||||
// write_to_global_log("writing log message... DONE");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -315,7 +378,7 @@ pub fn execute_run(
|
||||
|
||||
log::info!("starting headless gpui app");
|
||||
|
||||
let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
|
||||
// let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
|
||||
gpui::App::headless().run(move |cx| {
|
||||
settings::init(cx);
|
||||
HeadlessProject::init(cx);
|
||||
@@ -325,8 +388,8 @@ pub fn execute_run(
|
||||
|
||||
client::init_settings(cx);
|
||||
|
||||
GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
|
||||
git_hosting_providers::init(cx);
|
||||
// GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
|
||||
// git_hosting_providers::init(cx);
|
||||
|
||||
let project = cx.new_model(|cx| {
|
||||
let fs = Arc::new(RealFs::new(Default::default(), None));
|
||||
@@ -403,7 +466,8 @@ impl ServerPaths {
|
||||
}
|
||||
|
||||
pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
|
||||
init_logging_proxy();
|
||||
let log_file = paths::remote_server_state_dir().join(&identifier).join("proxy.log");
|
||||
init_logging_proxy(log_file).log_err();
|
||||
init_panic_hook();
|
||||
|
||||
log::info!("starting proxy process. PID: {}", std::process::id());
|
||||
@@ -443,6 +507,7 @@ pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
|
||||
let mut stream = smol::net::unix::UnixStream::connect(&server_paths.stderr_socket).await?;
|
||||
let mut stderr_buffer = vec![0; 2048];
|
||||
loop {
|
||||
log::debug!("proxy reading message from stderr socket");
|
||||
match stream.read(&mut stderr_buffer).await {
|
||||
Ok(0) => {
|
||||
let error =
|
||||
@@ -598,6 +663,7 @@ where
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
loop {
|
||||
log::debug!("proxy reading message from {}", socket_name);
|
||||
read_message_raw(&mut reader, &mut buffer)
|
||||
.await
|
||||
.with_context(|| format!("failed to read message from {}", socket_name))?;
|
||||
@@ -617,6 +683,7 @@ async fn write_size_prefixed_buffer<S: AsyncWrite + Unpin>(
|
||||
buffer: &mut Vec<u8>,
|
||||
) -> Result<()> {
|
||||
let len = buffer.len() as u32;
|
||||
log::debug!("write_size_prefixed_buffer. writing len: {}", len);
|
||||
stream.write_all(len.to_le_bytes().as_slice()).await?;
|
||||
stream.write_all(buffer).await?;
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user