Compare commits
1 Commits
logging
...
move-messa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db9f079598 |
136
crates/rpc/src/message_stream.rs
Normal file
136
crates/rpc/src/message_stream.rs
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
use crate::proto::{Envelope, Message};
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use async_tungstenite::tungstenite::Message as WebSocketMessage;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use prost::Message as _;
|
||||||
|
use std::{io, time::Instant};
|
||||||
|
|
||||||
|
const KIB: usize = 1024;
|
||||||
|
const MIB: usize = KIB * 1024;
|
||||||
|
const MAX_BUFFER_LEN: usize = MIB;
|
||||||
|
|
||||||
|
/// A stream of protobuf messages.
|
||||||
|
pub struct MessageStream<S> {
|
||||||
|
stream: S,
|
||||||
|
encoding_buffer: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> MessageStream<S> {
|
||||||
|
pub fn new(stream: S) -> Self {
|
||||||
|
Self {
|
||||||
|
stream,
|
||||||
|
encoding_buffer: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn inner_mut(&mut self) -> &mut S {
|
||||||
|
&mut self.stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> MessageStream<S>
|
||||||
|
where
|
||||||
|
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
|
||||||
|
{
|
||||||
|
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
|
||||||
|
#[cfg(any(test, feature = "test-support"))]
|
||||||
|
const COMPRESSION_LEVEL: i32 = -7;
|
||||||
|
|
||||||
|
#[cfg(not(any(test, feature = "test-support")))]
|
||||||
|
const COMPRESSION_LEVEL: i32 = 4;
|
||||||
|
|
||||||
|
match message {
|
||||||
|
Message::Envelope(message) => {
|
||||||
|
self.encoding_buffer.reserve(message.encoded_len());
|
||||||
|
message
|
||||||
|
.encode(&mut self.encoding_buffer)
|
||||||
|
.map_err(io::Error::from)?;
|
||||||
|
let buffer =
|
||||||
|
zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
self.encoding_buffer.clear();
|
||||||
|
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
||||||
|
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
|
||||||
|
}
|
||||||
|
Message::Ping => {
|
||||||
|
self.stream
|
||||||
|
.send(WebSocketMessage::Ping(Default::default()))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Message::Pong => {
|
||||||
|
self.stream
|
||||||
|
.send(WebSocketMessage::Pong(Default::default()))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> MessageStream<S>
|
||||||
|
where
|
||||||
|
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
|
||||||
|
{
|
||||||
|
pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
|
||||||
|
while let Some(bytes) = self.stream.next().await {
|
||||||
|
let received_at = Instant::now();
|
||||||
|
match bytes? {
|
||||||
|
WebSocketMessage::Binary(bytes) => {
|
||||||
|
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
|
||||||
|
let envelope = Envelope::decode(self.encoding_buffer.as_slice())
|
||||||
|
.map_err(io::Error::from)?;
|
||||||
|
|
||||||
|
self.encoding_buffer.clear();
|
||||||
|
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
||||||
|
return Ok((Message::Envelope(envelope), received_at));
|
||||||
|
}
|
||||||
|
WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
|
||||||
|
WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
|
||||||
|
WebSocketMessage::Close(_) => break,
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(anyhow!("connection closed"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::proto::{envelope, Envelope, UpdateWorktree};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[gpui::test]
|
||||||
|
async fn test_buffer_size() {
|
||||||
|
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||||
|
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
|
||||||
|
sink.write(Message::Envelope(Envelope {
|
||||||
|
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
||||||
|
root_name: "abcdefg".repeat(10),
|
||||||
|
..Default::default()
|
||||||
|
})),
|
||||||
|
..Default::default()
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||||
|
sink.write(Message::Envelope(Envelope {
|
||||||
|
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
||||||
|
root_name: "abcdefg".repeat(1000000),
|
||||||
|
..Default::default()
|
||||||
|
})),
|
||||||
|
..Default::default()
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||||
|
|
||||||
|
let mut stream = MessageStream::new(rx.map(anyhow::Ok));
|
||||||
|
stream.read().await.unwrap();
|
||||||
|
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||||
|
stream.read().await.unwrap();
|
||||||
|
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
use crate::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError};
|
use crate::{ErrorCode, ErrorCodeExt, ErrorExt, RpcError};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
|
proto::{self, AnyTypedEnvelope, EnvelopedMessage, PeerId, RequestMessage},
|
||||||
Connection,
|
Connection, MessageStream,
|
||||||
};
|
};
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use collections::HashMap;
|
use collections::HashMap;
|
||||||
|
|||||||
@@ -1,21 +1,15 @@
|
|||||||
#![allow(non_snake_case)]
|
#![allow(non_snake_case)]
|
||||||
|
|
||||||
use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
|
use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
|
||||||
use anyhow::{anyhow, Result};
|
|
||||||
use async_tungstenite::tungstenite::Message as WebSocketMessage;
|
|
||||||
use collections::HashMap;
|
use collections::HashMap;
|
||||||
use futures::{SinkExt as _, StreamExt as _};
|
|
||||||
use prost::Message as _;
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::any::{Any, TypeId};
|
|
||||||
use std::time::Instant;
|
|
||||||
use std::{
|
use std::{
|
||||||
|
any::{Any, TypeId},
|
||||||
cmp,
|
cmp,
|
||||||
fmt::Debug,
|
fmt::{self, Debug},
|
||||||
io, iter,
|
iter, mem,
|
||||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
use std::{fmt, mem};
|
|
||||||
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
|
include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
|
||||||
|
|
||||||
@@ -516,16 +510,6 @@ entity_messages!(
|
|||||||
UpdateChannelBufferCollaborators,
|
UpdateChannelBufferCollaborators,
|
||||||
);
|
);
|
||||||
|
|
||||||
const KIB: usize = 1024;
|
|
||||||
const MIB: usize = KIB * 1024;
|
|
||||||
const MAX_BUFFER_LEN: usize = MIB;
|
|
||||||
|
|
||||||
/// A stream of protobuf messages.
|
|
||||||
pub struct MessageStream<S> {
|
|
||||||
stream: S,
|
|
||||||
encoding_buffer: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[allow(clippy::large_enum_variant)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
@@ -534,87 +518,6 @@ pub enum Message {
|
|||||||
Pong,
|
Pong,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> MessageStream<S> {
|
|
||||||
pub fn new(stream: S) -> Self {
|
|
||||||
Self {
|
|
||||||
stream,
|
|
||||||
encoding_buffer: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn inner_mut(&mut self) -> &mut S {
|
|
||||||
&mut self.stream
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> MessageStream<S>
|
|
||||||
where
|
|
||||||
S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
|
|
||||||
{
|
|
||||||
pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
|
|
||||||
#[cfg(any(test, feature = "test-support"))]
|
|
||||||
const COMPRESSION_LEVEL: i32 = -7;
|
|
||||||
|
|
||||||
#[cfg(not(any(test, feature = "test-support")))]
|
|
||||||
const COMPRESSION_LEVEL: i32 = 4;
|
|
||||||
|
|
||||||
match message {
|
|
||||||
Message::Envelope(message) => {
|
|
||||||
self.encoding_buffer.reserve(message.encoded_len());
|
|
||||||
message
|
|
||||||
.encode(&mut self.encoding_buffer)
|
|
||||||
.map_err(io::Error::from)?;
|
|
||||||
let buffer =
|
|
||||||
zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
self.encoding_buffer.clear();
|
|
||||||
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
|
||||||
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
|
|
||||||
}
|
|
||||||
Message::Ping => {
|
|
||||||
self.stream
|
|
||||||
.send(WebSocketMessage::Ping(Default::default()))
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Message::Pong => {
|
|
||||||
self.stream
|
|
||||||
.send(WebSocketMessage::Pong(Default::default()))
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> MessageStream<S>
|
|
||||||
where
|
|
||||||
S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
|
|
||||||
{
|
|
||||||
pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
|
|
||||||
while let Some(bytes) = self.stream.next().await {
|
|
||||||
let received_at = Instant::now();
|
|
||||||
match bytes? {
|
|
||||||
WebSocketMessage::Binary(bytes) => {
|
|
||||||
zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
|
|
||||||
let envelope = Envelope::decode(self.encoding_buffer.as_slice())
|
|
||||||
.map_err(io::Error::from)?;
|
|
||||||
|
|
||||||
self.encoding_buffer.clear();
|
|
||||||
self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
|
|
||||||
return Ok((Message::Envelope(envelope), received_at));
|
|
||||||
}
|
|
||||||
WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
|
|
||||||
WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
|
|
||||||
WebSocketMessage::Close(_) => break,
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(anyhow!("connection closed"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Timestamp> for SystemTime {
|
impl From<Timestamp> for SystemTime {
|
||||||
fn from(val: Timestamp) -> Self {
|
fn from(val: Timestamp) -> Self {
|
||||||
UNIX_EPOCH
|
UNIX_EPOCH
|
||||||
@@ -722,38 +625,6 @@ pub fn split_worktree_update(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[gpui::test]
|
|
||||||
async fn test_buffer_size() {
|
|
||||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
|
||||||
let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
|
|
||||||
sink.write(Message::Envelope(Envelope {
|
|
||||||
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
|
||||||
root_name: "abcdefg".repeat(10),
|
|
||||||
..Default::default()
|
|
||||||
})),
|
|
||||||
..Default::default()
|
|
||||||
}))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
|
||||||
sink.write(Message::Envelope(Envelope {
|
|
||||||
payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
|
|
||||||
root_name: "abcdefg".repeat(1000000),
|
|
||||||
..Default::default()
|
|
||||||
})),
|
|
||||||
..Default::default()
|
|
||||||
}))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
|
||||||
|
|
||||||
let mut stream = MessageStream::new(rx.map(anyhow::Ok));
|
|
||||||
stream.read().await.unwrap();
|
|
||||||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
|
||||||
stream.read().await.unwrap();
|
|
||||||
assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[gpui::test]
|
#[gpui::test]
|
||||||
fn test_converting_peer_id_from_and_to_u64() {
|
fn test_converting_peer_id_from_and_to_u64() {
|
||||||
let peer_id = PeerId {
|
let peer_id = PeerId {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ pub mod auth;
|
|||||||
mod conn;
|
mod conn;
|
||||||
mod error;
|
mod error;
|
||||||
mod extension;
|
mod extension;
|
||||||
|
mod message_stream;
|
||||||
mod notification;
|
mod notification;
|
||||||
mod peer;
|
mod peer;
|
||||||
pub mod proto;
|
pub mod proto;
|
||||||
@@ -9,6 +10,7 @@ pub mod proto;
|
|||||||
pub use conn::Connection;
|
pub use conn::Connection;
|
||||||
pub use error::*;
|
pub use error::*;
|
||||||
pub use extension::*;
|
pub use extension::*;
|
||||||
|
pub use message_stream::*;
|
||||||
pub use notification::*;
|
pub use notification::*;
|
||||||
pub use peer::*;
|
pub use peer::*;
|
||||||
mod macros;
|
mod macros;
|
||||||
|
|||||||
Reference in New Issue
Block a user