181 lines
6.0 KiB
Rust
181 lines
6.0 KiB
Rust
use anyhow::{Context, Result};
|
|
use p2p_chat::net::{NetEvent, NetworkManager};
|
|
use p2p_chat::protocol::{ChatMessage, GossipMessage, MediaKind};
|
|
|
|
use tokio::time::{timeout, Duration};
|
|
|
|
const TEST_TIMEOUT: Duration = Duration::from_secs(10);
|
|
|
|
async fn spawn_node(topic: [u8; 32]) -> Result<(NetworkManager, tokio::sync::mpsc::Sender<NetEvent>, tokio::sync::mpsc::Receiver<NetEvent>)> {
|
|
NetworkManager::new(topic).await
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_p2p_connection_and_gossip() -> Result<()> {
|
|
// 0. Setup
|
|
let topic = [0u8; 32]; // Consistent topic for this test
|
|
let (mut node_a, tx_a, mut rx_a) = spawn_node(topic).await?;
|
|
let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?;
|
|
|
|
let addr_a = node_a.endpoint.addr();
|
|
let id_a = addr_a.id;
|
|
let id_b = node_b.endpoint.id();
|
|
|
|
println!("Node A: {}", id_a);
|
|
println!("Node B: {}", id_b);
|
|
|
|
// 1. Join Gossip
|
|
// Node A starts alone
|
|
node_a.join_gossip(vec![], tx_a.clone()).await?;
|
|
|
|
// Node B joins, bootstrapping from Node A.
|
|
// We force a connection first so B knows A's address.
|
|
node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?;
|
|
node_b.join_gossip(vec![id_a], tx_b.clone()).await?;
|
|
|
|
// 2. Wait for connection (PeerUp)
|
|
// Both sides should see each other
|
|
let _peer_up_a = timeout(TEST_TIMEOUT, async {
|
|
while let Some(event) = rx_a.recv().await {
|
|
if let NetEvent::PeerUp(peer_id) = event {
|
|
if peer_id == id_b { return Ok(()); }
|
|
}
|
|
}
|
|
anyhow::bail!("Stream ended without PeerUp");
|
|
}).await??;
|
|
|
|
let _peer_up_b = timeout(TEST_TIMEOUT, async {
|
|
while let Some(event) = rx_b.recv().await {
|
|
if let NetEvent::PeerUp(peer_id) = event {
|
|
if peer_id == id_a { return Ok(()); }
|
|
}
|
|
}
|
|
anyhow::bail!("Stream ended without PeerUp");
|
|
}).await??;
|
|
|
|
// 3. Test Gossip Broadcast (Chat Message)
|
|
let chat_msg = GossipMessage::Chat(ChatMessage {
|
|
sender_name: "Node A".into(),
|
|
timestamp: 12345,
|
|
text: "Hello Node B!".into(),
|
|
});
|
|
|
|
node_a.broadcast(&chat_msg).await?;
|
|
|
|
// Node B should receive it
|
|
timeout(TEST_TIMEOUT, async {
|
|
while let Some(event) = rx_b.recv().await {
|
|
if let NetEvent::GossipReceived { from, message } = event {
|
|
assert_eq!(from, id_a);
|
|
if let GossipMessage::Chat(msg) = message {
|
|
assert_eq!(msg.text, "Hello Node B!");
|
|
assert_eq!(msg.sender_name, "Node A");
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
anyhow::bail!("Stream ended without GossipReceived");
|
|
}).await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_direct_file_transfer_stream() -> Result<()> {
|
|
// 0. Setup and Connect
|
|
let topic = [1u8; 32];
|
|
let (mut node_a, tx_a, _rx_a) = spawn_node(topic).await?;
|
|
let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?;
|
|
|
|
let addr_a = node_a.endpoint.addr();
|
|
let id_a = addr_a.id;
|
|
let id_b = node_b.endpoint.id();
|
|
|
|
node_a.join_gossip(vec![], tx_a).await?;
|
|
node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?;
|
|
node_b.join_gossip(vec![id_a], tx_b).await?;
|
|
|
|
// 1. Open direct stream from A to B
|
|
// node_b needs to accept the stream. In the actual app, this is handled by spawn_acceptor loop
|
|
// sending NetEvent::IncomingFileStream.
|
|
|
|
// A opens stream
|
|
let (mut send_stream, mut _recv_stream) = node_a.open_file_stream(id_b).await?;
|
|
|
|
// Send some data
|
|
let test_data = b"Hello direct stream";
|
|
send_stream.write_all(test_data).await?;
|
|
send_stream.finish()?;
|
|
|
|
// 2. B should receive IncomingFileStream event
|
|
let (mut b_recv_stream, from) = timeout(TEST_TIMEOUT, async {
|
|
while let Some(event) = rx_b.recv().await {
|
|
if let NetEvent::IncomingFileStream { from, recv, .. } = event {
|
|
return Ok((recv, from));
|
|
}
|
|
}
|
|
anyhow::bail!("Stream ended without IncomingFileStream");
|
|
}).await??;
|
|
|
|
assert_eq!(from, id_a);
|
|
|
|
// 3. Read data on B side
|
|
let mut buf = vec![0u8; test_data.len()];
|
|
b_recv_stream.read_exact(&mut buf).await?;
|
|
assert_eq!(buf, test_data);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_media_stream_separation() -> Result<()> {
|
|
// 0. Setup and Connect
|
|
let topic = [2u8; 32];
|
|
let (mut node_a, tx_a, _rx_a) = spawn_node(topic).await?;
|
|
let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?;
|
|
|
|
let addr_a = node_a.endpoint.addr();
|
|
let id_a = addr_a.id;
|
|
let id_b = node_b.endpoint.id();
|
|
|
|
node_a.join_gossip(vec![], tx_a).await?;
|
|
node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?;
|
|
node_b.join_gossip(vec![id_a], tx_b).await?;
|
|
|
|
// Helper to verify stream type
|
|
async fn verify_stream(
|
|
node_sender: &NetworkManager,
|
|
peer_id: p2p_chat::net::EndpointId,
|
|
rx_receiver: &mut tokio::sync::mpsc::Receiver<NetEvent>,
|
|
kind: MediaKind,
|
|
) -> Result<()> {
|
|
// Sender opens stream
|
|
let (mut send, _recv) = node_sender.open_media_stream(peer_id, kind).await?;
|
|
send.finish()?;
|
|
|
|
// Receiver should get IncomingMediaStream with correct kind
|
|
timeout(TEST_TIMEOUT, async {
|
|
while let Some(event) = rx_receiver.recv().await {
|
|
if let NetEvent::IncomingMediaStream { from: _, kind: k, .. } = event {
|
|
assert_eq!(k, kind);
|
|
return Ok(());
|
|
}
|
|
}
|
|
anyhow::bail!("Stream ended without IncomingMediaStream for {:?}", kind);
|
|
}).await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// 1. Verify Voice Stream
|
|
verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Voice).await.context("Voice stream failed")?;
|
|
|
|
// 2. Verify Camera Stream
|
|
verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Camera).await.context("Camera stream failed")?;
|
|
|
|
// 3. Verify Screen Stream
|
|
verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Screen).await.context("Screen stream failed")?;
|
|
|
|
Ok(())
|
|
}
|