use anyhow::{Context, Result}; use p2p_chat::net::{NetEvent, NetworkManager}; use p2p_chat::protocol::{ChatMessage, GossipMessage, MediaKind}; use tokio::time::{timeout, Duration}; const TEST_TIMEOUT: Duration = Duration::from_secs(10); async fn spawn_node( topic: [u8; 32], ) -> Result<( NetworkManager, tokio::sync::mpsc::Sender, tokio::sync::mpsc::Receiver, )> { NetworkManager::new(topic).await } #[tokio::test] async fn test_p2p_connection_and_gossip() -> Result<()> { // 0. Setup let topic = [0u8; 32]; // Consistent topic for this test let (mut node_a, tx_a, mut rx_a) = spawn_node(topic).await?; let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?; let addr_a = node_a.endpoint.addr(); let id_a = addr_a.id; let id_b = node_b.endpoint.id(); println!("Node A: {}", id_a); println!("Node B: {}", id_b); // 1. Join Gossip // Node A starts alone node_a.join_gossip(vec![], tx_a.clone()).await?; // Node B joins, bootstrapping from Node A. // We force a connection first so B knows A's address. node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?; node_b.join_gossip(vec![id_a], tx_b.clone()).await?; // 2. Wait for connection (PeerUp) // Both sides should see each other let _peer_up_a = timeout(TEST_TIMEOUT, async { while let Some(event) = rx_a.recv().await { if let NetEvent::PeerUp(peer_id) = event { if peer_id == id_b { return Ok(()); } } } anyhow::bail!("Stream ended without PeerUp"); }) .await??; let _peer_up_b = timeout(TEST_TIMEOUT, async { while let Some(event) = rx_b.recv().await { if let NetEvent::PeerUp(peer_id) = event { if peer_id == id_a { return Ok(()); } } } anyhow::bail!("Stream ended without PeerUp"); }) .await??; // 3. Test Gossip Broadcast (Chat Message) let chat_msg = GossipMessage::Chat(ChatMessage { sender_name: "Node A".into(), timestamp: 12345, text: "Hello Node B!".into(), }); node_a.broadcast(&chat_msg).await?; // Node B should receive it timeout(TEST_TIMEOUT, async { while let Some(event) = rx_b.recv().await { if let NetEvent::GossipReceived { from, message } = event { assert_eq!(from, id_a); if let GossipMessage::Chat(msg) = message { assert_eq!(msg.text, "Hello Node B!"); assert_eq!(msg.sender_name, "Node A"); return Ok(()); } } } anyhow::bail!("Stream ended without GossipReceived"); }) .await??; Ok(()) } #[tokio::test] async fn test_direct_file_transfer_stream() -> Result<()> { // 0. Setup and Connect let topic = [1u8; 32]; let (mut node_a, tx_a, _rx_a) = spawn_node(topic).await?; let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?; let addr_a = node_a.endpoint.addr(); let id_a = addr_a.id; let id_b = node_b.endpoint.id(); node_a.join_gossip(vec![], tx_a).await?; node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?; node_b.join_gossip(vec![id_a], tx_b).await?; // 1. Open direct stream from A to B // node_b needs to accept the stream. In the actual app, this is handled by spawn_acceptor loop // sending NetEvent::IncomingFileStream. // A opens stream let (mut send_stream, mut _recv_stream) = node_a.open_file_stream(id_b).await?; // Send some data let test_data = b"Hello direct stream"; send_stream.write_all(test_data).await?; send_stream.finish()?; // 2. B should receive IncomingFileStream event let (mut b_recv_stream, from) = timeout(TEST_TIMEOUT, async { while let Some(event) = rx_b.recv().await { if let NetEvent::IncomingFileStream { from, recv, .. } = event { return Ok((recv, from)); } } anyhow::bail!("Stream ended without IncomingFileStream"); }) .await??; assert_eq!(from, id_a); // 3. Read data on B side let mut buf = vec![0u8; test_data.len()]; b_recv_stream.read_exact(&mut buf).await?; assert_eq!(buf, test_data); Ok(()) } #[tokio::test] async fn test_media_stream_separation() -> Result<()> { // 0. Setup and Connect let topic = [2u8; 32]; let (mut node_a, tx_a, _rx_a) = spawn_node(topic).await?; let (mut node_b, tx_b, mut rx_b) = spawn_node(topic).await?; let addr_a = node_a.endpoint.addr(); let id_a = addr_a.id; let id_b = node_b.endpoint.id(); node_a.join_gossip(vec![], tx_a).await?; node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?; node_b.join_gossip(vec![id_a], tx_b).await?; // Helper to verify stream type async fn verify_stream( node_sender: &NetworkManager, peer_id: p2p_chat::net::EndpointId, rx_receiver: &mut tokio::sync::mpsc::Receiver, kind: MediaKind, ) -> Result<()> { // Sender opens stream let (mut send, _recv) = node_sender.open_media_stream(peer_id, kind).await?; send.finish()?; // Receiver should get IncomingMediaStream with correct kind timeout(TEST_TIMEOUT, async { while let Some(event) = rx_receiver.recv().await { if let NetEvent::IncomingMediaStream { from: _, kind: k, .. } = event { assert_eq!(k, kind); return Ok(()); } } anyhow::bail!("Stream ended without IncomingMediaStream for {:?}", kind); }) .await??; Ok(()) } // 1. Verify Voice Stream verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Voice) .await .context("Voice stream failed")?; // 2. Verify Camera Stream verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Camera) .await .context("Camera stream failed")?; // 3. Verify Screen Stream verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Screen) .await .context("Screen stream failed")?; Ok(()) }