Compare commits
1 Commits
debug-shel
...
move-messa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db9f079598 |
136
crates/rpc/src/message_stream.rs
Normal file
136
crates/rpc/src/message_stream.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use crate::proto::{Envelope, Message};
|
||||
use anyhow::{anyhow, Result};
|
||||
use async_tungstenite::tungstenite::Message as WebSocketMessage;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use prost::Message as _;
|
||||
use std::{io, time::Instant};
|
||||
|
||||
const KIB: usize = 1024;
|
||||
const MIB: usize = KIB * 1024;
|
||||
const MAX_BUFFER_LEN: usize = MIB;
|
||||
|
||||
/// A stream of protobuf messages.
|
||||
pub struct MessageStream<S> {
|
||||
stream: S,
|
||||
encoding_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<S> MessageStream<S> {
|
||||
pub fn new(stream: S) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
encoding_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inner_mut(&mut self) -> &mut S {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> MessageStream<S>
|
||||
where
|
||||
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
|
||||
{
|
||||
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
const COMPRESSION_LEVEL: i32 = -7;
|
||||
|
||||
#[cfg(not(any(test, feature = "test-support")))]
|
||||
const COMPRESSION_LEVEL: i32 = 4;
|
||||
|
||||
match message {
|
||||
Message::Envelope(message) => {
|
||||
self.encoding_buffer.reserve(message.encoded_len());
|
||||
message
|
||||
.encode(&mut self.encoding_buffer)
|
||||
.map_err(io::Error::from)?;
|
||||
let buffer =
|
||||
zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
|
||||
.unwrap();
|
||||
|
||||
self.encoding_buffer.clear();
|
||||
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
||||
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
|
||||
}
|
||||
Message::Ping => {
|
||||
self.stream
|
||||
.send(WebSocketMessage::Ping(Default::default()))
|
||||
.await?;
|
||||
}
|
||||
Message::Pong => {
|
||||
self.stream
|
||||
.send(WebSocketMessage::Pong(Default::default()))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> MessageStream<S>
|
||||
where
|
||||
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
|
||||
{
|
||||
pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
|
||||
while let Some(bytes) = self.stream.next().await {
|
||||
let received_at = Instant::now();
|
||||
match bytes? {
|
||||
WebSocketMessage::Binary(bytes) => {
|
||||
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
|
||||
let envelope = Envelope::decode(self.encoding_buffer.as_slice())
|
||||
.map_err(io::Error::from)?;
|
||||
|
||||
self.encoding_buffer.clear();
|
||||
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
||||
return Ok((Message::Envelope(envelope), received_at));
|
||||
}
|
||||
WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
|
||||
WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
|
||||
WebSocketMessage::Close(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Err(anyhow!("connection closed"))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::proto::{envelope, Envelope, UpdateWorktree};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[gpui::test]
|
||||
async fn test_buffer_size() {
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
|
||||
sink.write(Message::Envelope(Envelope {
|
||||
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
||||
root_name: "abcdefg".repeat(10),
|
||||
..Default::default()
|
||||
})),
|
||||
..Default::default()
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
sink.write(Message::Envelope(Envelope {
|
||||
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
||||
root_name: "abcdefg".repeat(1000000),
|
||||
..Default::default()
|
||||
})),
|
||||
..Default::default()
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
|
||||
let mut stream = MessageStream::new(rx.map(anyhow::Ok));
|
||||
stream.read().await.unwrap();
|
||||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
stream.read().await.unwrap();
|
||||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError};
|
||||
|
||||
use super::{
|
||||
proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
|
||||
Connection,
|
||||
proto::{self, AnyTypedEnvelope, EnvelopedMessage, PeerId, RequestMessage},
|
||||
Connection, MessageStream,
|
||||
};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use collections::HashMap;
|
||||
|
||||
@@ -1,21 +1,15 @@
|
||||
#![allow(non_snake_case)]
|
||||
|
||||
use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
|
||||
use anyhow::{anyhow, Result};
|
||||
use async_tungstenite::tungstenite::Message as WebSocketMessage;
|
||||
use collections::HashMap;
|
||||
use futures::{SinkExt as _, StreamExt as _};
|
||||
use prost::Message as _;
|
||||
use serde::Serialize;
|
||||
use std::any::{Any, TypeId};
|
||||
use std::time::Instant;
|
||||
use std::{
|
||||
any::{Any, TypeId},
|
||||
cmp,
|
||||
fmt::Debug,
|
||||
io, iter,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
fmt::{self, Debug},
|
||||
iter, mem,
|
||||
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use std::{fmt, mem};
|
||||
|
||||
include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
|
||||
|
||||
@@ -516,16 +510,6 @@ entity_messages!(
|
||||
UpdateChannelBufferCollaborators,
|
||||
);
|
||||
|
||||
const KIB: usize = 1024;
|
||||
const MIB: usize = KIB * 1024;
|
||||
const MAX_BUFFER_LEN: usize = MIB;
|
||||
|
||||
/// A stream of protobuf messages.
|
||||
pub struct MessageStream<S> {
|
||||
stream: S,
|
||||
encoding_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug)]
|
||||
pub enum Message {
|
||||
@@ -534,87 +518,6 @@ pub enum Message {
|
||||
Pong,
|
||||
}
|
||||
|
||||
impl<S> MessageStream<S> {
|
||||
pub fn new(stream: S) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
encoding_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inner_mut(&mut self) -> &mut S {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> MessageStream<S>
|
||||
where
|
||||
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
|
||||
{
|
||||
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
const COMPRESSION_LEVEL: i32 = -7;
|
||||
|
||||
#[cfg(not(any(test, feature = "test-support")))]
|
||||
const COMPRESSION_LEVEL: i32 = 4;
|
||||
|
||||
match message {
|
||||
Message::Envelope(message) => {
|
||||
self.encoding_buffer.reserve(message.encoded_len());
|
||||
message
|
||||
.encode(&mut self.encoding_buffer)
|
||||
.map_err(io::Error::from)?;
|
||||
let buffer =
|
||||
zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
|
||||
.unwrap();
|
||||
|
||||
self.encoding_buffer.clear();
|
||||
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
||||
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
|
||||
}
|
||||
Message::Ping => {
|
||||
self.stream
|
||||
.send(WebSocketMessage::Ping(Default::default()))
|
||||
.await?;
|
||||
}
|
||||
Message::Pong => {
|
||||
self.stream
|
||||
.send(WebSocketMessage::Pong(Default::default()))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> MessageStream<S>
|
||||
where
|
||||
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
|
||||
{
|
||||
pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
|
||||
while let Some(bytes) = self.stream.next().await {
|
||||
let received_at = Instant::now();
|
||||
match bytes? {
|
||||
WebSocketMessage::Binary(bytes) => {
|
||||
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
|
||||
let envelope = Envelope::decode(self.encoding_buffer.as_slice())
|
||||
.map_err(io::Error::from)?;
|
||||
|
||||
self.encoding_buffer.clear();
|
||||
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
||||
return Ok((Message::Envelope(envelope), received_at));
|
||||
}
|
||||
WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
|
||||
WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
|
||||
WebSocketMessage::Close(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Err(anyhow!("connection closed"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Timestamp> for SystemTime {
|
||||
fn from(val: Timestamp) -> Self {
|
||||
UNIX_EPOCH
|
||||
@@ -722,38 +625,6 @@ pub fn split_worktree_update(
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[gpui::test]
|
||||
async fn test_buffer_size() {
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
|
||||
sink.write(Message::Envelope(Envelope {
|
||||
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
||||
root_name: "abcdefg".repeat(10),
|
||||
..Default::default()
|
||||
})),
|
||||
..Default::default()
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
sink.write(Message::Envelope(Envelope {
|
||||
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
||||
root_name: "abcdefg".repeat(1000000),
|
||||
..Default::default()
|
||||
})),
|
||||
..Default::default()
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
|
||||
let mut stream = MessageStream::new(rx.map(anyhow::Ok));
|
||||
stream.read().await.unwrap();
|
||||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
stream.read().await.unwrap();
|
||||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||
}
|
||||
|
||||
#[gpui::test]
|
||||
fn test_converting_peer_id_from_and_to_u64() {
|
||||
let peer_id = PeerId {
|
||||
|
||||
@@ -2,6 +2,7 @@ pub mod auth;
|
||||
mod conn;
|
||||
mod error;
|
||||
mod extension;
|
||||
mod message_stream;
|
||||
mod notification;
|
||||
mod peer;
|
||||
pub mod proto;
|
||||
@@ -9,6 +10,7 @@ pub mod proto;
|
||||
pub use conn::Connection;
|
||||
pub use error::*;
|
||||
pub use extension::*;
|
||||
pub use message_stream::*;
|
||||
pub use notification::*;
|
||||
pub use peer::*;
|
||||
mod macros;
|
||||
|
||||
Reference in New Issue
Block a user