Compare commits

...

1 Commits

Author SHA1 Message Date
Nathan Sobo
db9f079598 Move MessageStream to its own module
Split the MessageStream type into a separate module to keep the proto
module focused on protobuf message definitions. The new message_stream module
contains the MessageStream type and its implementation for reading and writing
protobuf messages over a WebSocket connection.

Refactored imports and updated references to MessageStream in the peer and
proto modules to use the new path.
2024-06-09 21:06:57 -06:00
4 changed files with 144 additions and 135 deletions

View File

@@ -0,0 +1,136 @@
use crate::proto::{Envelope, Message};
use anyhow::{anyhow, Result};
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use futures::{SinkExt, StreamExt};
use prost::Message as _;
use std::{io, time::Instant};
const KIB: usize = 1024;
const MIB: usize = KIB * 1024;
const MAX_BUFFER_LEN: usize = MIB;
/// A stream of protobuf messages.
pub struct MessageStream<S> {
stream: S,
encoding_buffer: Vec<u8>,
}
impl<S> MessageStream<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
encoding_buffer: Vec::new(),
}
}
pub fn inner_mut(&mut self) -> &mut S {
&mut self.stream
}
}
impl<S> MessageStream<S>
where
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
{
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
#[cfg(any(test, feature = "test-support"))]
const COMPRESSION_LEVEL: i32 = -7;
#[cfg(not(any(test, feature = "test-support")))]
const COMPRESSION_LEVEL: i32 = 4;
match message {
Message::Envelope(message) => {
self.encoding_buffer.reserve(message.encoded_len());
message
.encode(&mut self.encoding_buffer)
.map_err(io::Error::from)?;
let buffer =
zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
.unwrap();
self.encoding_buffer.clear();
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
}
Message::Ping => {
self.stream
.send(WebSocketMessage::Ping(Default::default()))
.await?;
}
Message::Pong => {
self.stream
.send(WebSocketMessage::Pong(Default::default()))
.await?;
}
}
Ok(())
}
}
impl<S> MessageStream<S>
where
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
{
pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
while let Some(bytes) = self.stream.next().await {
let received_at = Instant::now();
match bytes? {
WebSocketMessage::Binary(bytes) => {
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
let envelope = Envelope::decode(self.encoding_buffer.as_slice())
.map_err(io::Error::from)?;
self.encoding_buffer.clear();
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
return Ok((Message::Envelope(envelope), received_at));
}
WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
WebSocketMessage::Close(_) => break,
_ => {}
}
}
Err(anyhow!("connection closed"))
}
}
#[cfg(test)]
mod tests {
use crate::proto::{envelope, Envelope, UpdateWorktree};
use super::*;
#[gpui::test]
async fn test_buffer_size() {
let (tx, rx) = futures::channel::mpsc::unbounded();
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
sink.write(Message::Envelope(Envelope {
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
root_name: "abcdefg".repeat(10),
..Default::default()
})),
..Default::default()
}))
.await
.unwrap();
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
sink.write(Message::Envelope(Envelope {
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
root_name: "abcdefg".repeat(1000000),
..Default::default()
})),
..Default::default()
}))
.await
.unwrap();
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
let mut stream = MessageStream::new(rx.map(anyhow::Ok));
stream.read().await.unwrap();
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
stream.read().await.unwrap();
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
}
}

View File

@@ -1,8 +1,8 @@
use crate::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError};
use super::{
proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
Connection,
proto::{self, AnyTypedEnvelope, EnvelopedMessage, PeerId, RequestMessage},
Connection, MessageStream,
};
use anyhow::{anyhow, Context, Result};
use collections::HashMap;

View File

@@ -1,21 +1,15 @@
#![allow(non_snake_case)]
use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
use anyhow::{anyhow, Result};
use async_tungstenite::tungstenite::Message as WebSocketMessage;
use collections::HashMap;
use futures::{SinkExt as _, StreamExt as _};
use prost::Message as _;
use serde::Serialize;
use std::any::{Any, TypeId};
use std::time::Instant;
use std::{
any::{Any, TypeId},
cmp,
fmt::Debug,
io, iter,
time::{Duration, SystemTime, UNIX_EPOCH},
fmt::{self, Debug},
iter, mem,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use std::{fmt, mem};
include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
@@ -516,16 +510,6 @@ entity_messages!(
UpdateChannelBufferCollaborators,
);
const KIB: usize = 1024;
const MIB: usize = KIB * 1024;
const MAX_BUFFER_LEN: usize = MIB;
/// A stream of protobuf messages.
pub struct MessageStream<S> {
stream: S,
encoding_buffer: Vec<u8>,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum Message {
@@ -534,87 +518,6 @@ pub enum Message {
Pong,
}
impl<S> MessageStream<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
encoding_buffer: Vec::new(),
}
}
pub fn inner_mut(&mut self) -> &mut S {
&mut self.stream
}
}
impl<S> MessageStream<S>
where
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
{
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
#[cfg(any(test, feature = "test-support"))]
const COMPRESSION_LEVEL: i32 = -7;
#[cfg(not(any(test, feature = "test-support")))]
const COMPRESSION_LEVEL: i32 = 4;
match message {
Message::Envelope(message) => {
self.encoding_buffer.reserve(message.encoded_len());
message
.encode(&mut self.encoding_buffer)
.map_err(io::Error::from)?;
let buffer =
zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
.unwrap();
self.encoding_buffer.clear();
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
}
Message::Ping => {
self.stream
.send(WebSocketMessage::Ping(Default::default()))
.await?;
}
Message::Pong => {
self.stream
.send(WebSocketMessage::Pong(Default::default()))
.await?;
}
}
Ok(())
}
}
impl<S> MessageStream<S>
where
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
{
pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
while let Some(bytes) = self.stream.next().await {
let received_at = Instant::now();
match bytes? {
WebSocketMessage::Binary(bytes) => {
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
let envelope = Envelope::decode(self.encoding_buffer.as_slice())
.map_err(io::Error::from)?;
self.encoding_buffer.clear();
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
return Ok((Message::Envelope(envelope), received_at));
}
WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
WebSocketMessage::Close(_) => break,
_ => {}
}
}
Err(anyhow!("connection closed"))
}
}
impl From<Timestamp> for SystemTime {
fn from(val: Timestamp) -> Self {
UNIX_EPOCH
@@ -722,38 +625,6 @@ pub fn split_worktree_update(
mod tests {
use super::*;
#[gpui::test]
async fn test_buffer_size() {
let (tx, rx) = futures::channel::mpsc::unbounded();
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
sink.write(Message::Envelope(Envelope {
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
root_name: "abcdefg".repeat(10),
..Default::default()
})),
..Default::default()
}))
.await
.unwrap();
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
sink.write(Message::Envelope(Envelope {
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
root_name: "abcdefg".repeat(1000000),
..Default::default()
})),
..Default::default()
}))
.await
.unwrap();
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
let mut stream = MessageStream::new(rx.map(anyhow::Ok));
stream.read().await.unwrap();
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
stream.read().await.unwrap();
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
}
#[gpui::test]
fn test_converting_peer_id_from_and_to_u64() {
let peer_id = PeerId {

View File

@@ -2,6 +2,7 @@ pub mod auth;
mod conn;
mod error;
mod extension;
mod message_stream;
mod notification;
mod peer;
pub mod proto;
@@ -9,6 +10,7 @@ pub mod proto;
pub use conn::Connection;
pub use error::*;
pub use extension::*;
pub use message_stream::*;
pub use notification::*;
pub use peer::*;
mod macros;