Files
p2p-chat/tests/network_tests.rs

205 lines
6.2 KiB
Rust

use anyhow::{Context, Result};
use p2p_chat::net::{NetEvent, NetworkManager};
use p2p_chat::protocol::{ChatMessage, GossipMessage, MediaKind};
use tokio::time::{timeout, Duration};
const TEST_TIMEOUT: Duration = Duration::from_secs(10);
async fn spawn_node(
topic: [u8; 32],
) -> Result<(
NetworkManager,
tokio::sync::mpsc::Sender<NetEvent>,
tokio::sync::mpsc::Receiver<NetEvent>,
)> {
NetworkManager::new(topic).await
}
#[tokio::test]
async fn test_p2p_connection_and_gossip() -> Result<()> {
// 0. Setup
let topic = [0u8; 32]; // Consistent topic for this test
let (mut node_a, tx_a, mut rx_a) = spawn_node(topic).await?;
let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?;
let addr_a = node_a.endpoint.addr();
let id_a = addr_a.id;
let id_b = node_b.endpoint.id();
println!("Node A: {}", id_a);
println!("Node B: {}", id_b);
// 1. Join Gossip
// Node A starts alone
node_a.join_gossip(vec![], tx_a.clone()).await?;
// Node B joins, bootstrapping from Node A.
// We force a connection first so B knows A's address.
node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?;
node_b.join_gossip(vec![id_a], tx_b.clone()).await?;
// 2. Wait for connection (PeerUp)
// Both sides should see each other
let _peer_up_a = timeout(TEST_TIMEOUT, async {
while let Some(event) = rx_a.recv().await {
if let NetEvent::PeerUp(peer_id) = event {
if peer_id == id_b {
return Ok(());
}
}
}
anyhow::bail!("Stream ended without PeerUp");
})
.await??;
let _peer_up_b = timeout(TEST_TIMEOUT, async {
while let Some(event) = rx_b.recv().await {
if let NetEvent::PeerUp(peer_id) = event {
if peer_id == id_a {
return Ok(());
}
}
}
anyhow::bail!("Stream ended without PeerUp");
})
.await??;
// 3. Test Gossip Broadcast (Chat Message)
let chat_msg = GossipMessage::Chat(ChatMessage {
sender_name: "Node A".into(),
timestamp: 12345,
text: "Hello Node B!".into(),
});
node_a.broadcast(&chat_msg).await?;
// Node B should receive it
timeout(TEST_TIMEOUT, async {
while let Some(event) = rx_b.recv().await {
if let NetEvent::GossipReceived { from, message } = event {
assert_eq!(from, id_a);
if let GossipMessage::Chat(msg) = message {
assert_eq!(msg.text, "Hello Node B!");
assert_eq!(msg.sender_name, "Node A");
return Ok(());
}
}
}
anyhow::bail!("Stream ended without GossipReceived");
})
.await??;
Ok(())
}
#[tokio::test]
async fn test_direct_file_transfer_stream() -> Result<()> {
// 0. Setup and Connect
let topic = [1u8; 32];
let (mut node_a, tx_a, _rx_a) = spawn_node(topic).await?;
let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?;
let addr_a = node_a.endpoint.addr();
let id_a = addr_a.id;
let id_b = node_b.endpoint.id();
node_a.join_gossip(vec![], tx_a).await?;
node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?;
node_b.join_gossip(vec![id_a], tx_b).await?;
// 1. Open direct stream from A to B
// node_b needs to accept the stream. In the actual app, this is handled by spawn_acceptor loop
// sending NetEvent::IncomingFileStream.
// A opens stream
let (mut send_stream, mut _recv_stream) = node_a.open_file_stream(id_b).await?;
// Send some data
let test_data = b"Hello direct stream";
send_stream.write_all(test_data).await?;
send_stream.finish()?;
// 2. B should receive IncomingFileStream event
let (mut b_recv_stream, from) = timeout(TEST_TIMEOUT, async {
while let Some(event) = rx_b.recv().await {
if let NetEvent::IncomingFileStream { from, recv, .. } = event {
return Ok((recv, from));
}
}
anyhow::bail!("Stream ended without IncomingFileStream");
})
.await??;
assert_eq!(from, id_a);
// 3. Read data on B side
let mut buf = vec![0u8; test_data.len()];
b_recv_stream.read_exact(&mut buf).await?;
assert_eq!(buf, test_data);
Ok(())
}
#[tokio::test]
async fn test_media_stream_separation() -> Result<()> {
// 0. Setup and Connect
let topic = [2u8; 32];
let (mut node_a, tx_a, _rx_a) = spawn_node(topic).await?;
let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?;
let addr_a = node_a.endpoint.addr();
let id_a = addr_a.id;
let id_b = node_b.endpoint.id();
node_a.join_gossip(vec![], tx_a).await?;
node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?;
node_b.join_gossip(vec![id_a], tx_b).await?;
// Helper to verify stream type
async fn verify_stream(
node_sender: &NetworkManager,
peer_id: p2p_chat::net::EndpointId,
rx_receiver: &mut tokio::sync::mpsc::Receiver<NetEvent>,
kind: MediaKind,
) -> Result<()> {
// Sender opens stream
let (mut send, _recv) = node_sender.open_media_stream(peer_id, kind).await?;
send.finish()?;
// Receiver should get IncomingMediaStream with correct kind
timeout(TEST_TIMEOUT, async {
while let Some(event) = rx_receiver.recv().await {
if let NetEvent::IncomingMediaStream {
from: _, kind: k, ..
} = event
{
assert_eq!(k, kind);
return Ok(());
}
}
anyhow::bail!("Stream ended without IncomingMediaStream for {:?}", kind);
})
.await??;
Ok(())
}
// 1. Verify Voice Stream
verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Voice)
.await
.context("Voice stream failed")?;
// 2. Verify Camera Stream
verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Camera)
.await
.context("Camera stream failed")?;
// 3. Verify Screen Stream
verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Screen)
.await
.context("Screen stream failed")?;
Ok(())
}