Files
p2p-chat/src/net/mod.rs
2026-02-11 21:32:20 +03:00

386 lines
14 KiB
Rust

//! Networking layer using iroh for QUIC connections and iroh-gossip for broadcast.
//!
//! This module manages:
//! - The iroh `Endpoint` and `Router`
//! - Gossip-based broadcast for chat messages
//! - Peer connection tracking
//! - File transfer stream handling
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{Context, Result};
// use iroh::protocol::Router;
use iroh::Endpoint;
pub use iroh::EndpointId;
use iroh_gossip::Gossip;
use iroh_gossip::TopicId;
use n0_future::StreamExt;
use tokio::sync::{mpsc, Mutex};
use crate::protocol::{self, GossipMessage};
/// ALPN identifier for file transfers (direct peer-to-peer streams).
pub const FILE_TRANSFER_ALPN: &[u8] = b"p2p-chat/file/1";
/// ALPN identifier for voice chat streams.
pub const VOICE_ALPN: &[u8] = b"p2p-chat/voice/1";
/// ALPN identifier for camera streams.
pub const CAMERA_ALPN: &[u8] = b"p2p-chat/camera/1";
/// ALPN identifier for screen sharing streams.
pub const SCREEN_ALPN: &[u8] = b"p2p-chat/screen/1";
/// Network events sent to the application layer.
#[derive(Debug)]
pub enum NetEvent {
/// A gossip message was received from a peer.
GossipReceived {
from: EndpointId,
message: GossipMessage,
},
/// A peer came online (joined the gossip swarm).
PeerUp(EndpointId),
/// A peer went offline (left the gossip swarm).
PeerDown(EndpointId),
/// An incoming file transfer stream was accepted.
IncomingFileStream {
from: EndpointId,
#[allow(dead_code)]
send: iroh::endpoint::SendStream,
#[allow(dead_code)]
recv: iroh::endpoint::RecvStream,
},
/// An incoming media stream was accepted.
IncomingMediaStream {
from: EndpointId,
kind: protocol::MediaKind,
send: iroh::endpoint::SendStream,
recv: iroh::endpoint::RecvStream,
},
/// A datagram was received from a peer.
IncomingDatagram {
from: EndpointId,
data: bytes::Bytes,
},
}
/// Information about a connected peer.
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub id: EndpointId,
pub name: Option<String>,
pub capabilities: Option<protocol::CapabilitiesMessage>,
pub is_self: bool,
}
/// Manages the iroh networking stack.
#[derive(Clone)]
pub struct NetworkManager {
pub endpoint: Endpoint,
// We handle routing manually now
// router: Option<Router>,
pub gossip: Gossip,
topic: TopicId,
gossip_sender: Option<iroh_gossip::api::GossipSender>,
pub peers: Arc<Mutex<HashMap<EndpointId, PeerInfo>>>,
pub our_id: EndpointId,
}
impl NetworkManager {
/// Connect to a peer by ID.
/// Connect to a peer by ID.
pub async fn connect(&self, peer_id: EndpointId) -> Result<()> {
// 1. Establish transport connection
self.endpoint
.connect(peer_id, iroh_gossip::ALPN)
.await
.context("Failed to connect to peer")?;
// 2. Add peer to gossip by subscribing with it as a bootstrap peer.
// We spawn a task to drain the new receiver so the subscription stays active.
// This ensures the gossip actor knows about this peer.
let (_sender, mut receiver) = self
.gossip
.subscribe(self.topic, vec![peer_id])
.await
.context("Failed to subscribe with new peer")?
.split();
tokio::spawn(async move {
while let Some(_event) = receiver.next().await {
// Drain events to keep subscription active.
// We ignore them because the main subscription loop triggers
// the actual application logic (NetEvent).
}
});
Ok(())
}
/// Create a new NetworkManager and start the iroh endpoint.
pub async fn new(topic_bytes: [u8; 32]) -> Result<(Self, mpsc::Sender<NetEvent>, mpsc::Receiver<NetEvent>)> {
let (event_tx, event_rx) = mpsc::channel(256);
// Create endpoint with file transfer ALPN and Gossip ALPN
let endpoint = Endpoint::builder()
.alpns(vec![
FILE_TRANSFER_ALPN.to_vec(),
VOICE_ALPN.to_vec(),
CAMERA_ALPN.to_vec(),
SCREEN_ALPN.to_vec(),
iroh_gossip::ALPN.to_vec(),
])
.bind()
.await
.context("Failed to bind iroh endpoint")?;
let our_id = endpoint.id();
let topic = TopicId::from_bytes(topic_bytes);
// Build gossip protocol
let gossip = Gossip::builder().spawn(endpoint.clone());
let mgr = Self {
endpoint,
gossip: gossip.clone(),
// router: None,
topic,
gossip_sender: None,
peers: Arc::new(Mutex::new(HashMap::new())),
our_id,
};
// Spawn unified connection acceptor
Self::spawn_acceptor(mgr.endpoint.clone(), gossip, event_tx.clone());
Ok((mgr, event_tx, event_rx))
}
/// Subscribe to the gossip topic and start receiving messages.
/// `bootstrap_peers` are the initial peers to connect to.
pub async fn join_gossip(
&mut self,
bootstrap_peers: Vec<EndpointId>,
event_tx: mpsc::Sender<NetEvent>,
) -> Result<()> {
let (sender, mut receiver) = self
.gossip
.subscribe(self.topic, bootstrap_peers)
.await
.context("Failed to subscribe to gossip topic")?
.split();
self.gossip_sender = Some(sender);
let peers = self.peers.clone();
// Spawn gossip event receiver
tokio::spawn(async move {
while let Some(event) = receiver.next().await {
match event {
Ok(iroh_gossip::api::Event::Received(msg)) => {
// Deserialize the gossip message
match postcard::from_bytes::<GossipMessage>(&msg.content) {
Ok(gossip_msg) => {
let _ = event_tx
.send(NetEvent::GossipReceived {
from: msg.delivered_from,
message: gossip_msg,
})
.await;
}
Err(e) => {
tracing::warn!("Failed to deserialize gossip message: {}", e);
}
}
}
Ok(iroh_gossip::api::Event::NeighborUp(peer_id)) => {
{
let mut p = peers.lock().await;
p.entry(peer_id).or_insert_with(|| PeerInfo {
id: peer_id,
name: None,
capabilities: None,
is_self: false,
});
}
let _ = event_tx.send(NetEvent::PeerUp(peer_id)).await;
}
Ok(iroh_gossip::api::Event::NeighborDown(peer_id)) => {
{
let mut p = peers.lock().await;
p.remove(&peer_id);
}
let _ = event_tx.send(NetEvent::PeerDown(peer_id)).await;
}
Ok(_) => {}
Err(e) => {
tracing::error!("Gossip receiver error: {}", e);
break;
}
}
}
});
Ok(())
}
/// Broadcast a gossip message to all peers in the topic.
pub async fn broadcast(&self, msg: &GossipMessage) -> Result<()> {
if let Some(sender) = &self.gossip_sender {
let data = postcard::to_allocvec(msg)?;
sender.broadcast(data.into()).await?;
}
Ok(())
}
/// Get our endpoint address for sharing with peers.
#[allow(dead_code)]
pub fn our_addr(&self) -> iroh::EndpointAddr {
self.endpoint.addr()
}
/// Open a bi-directional stream to a specific peer for file transfer.
#[allow(dead_code)]
pub async fn open_file_stream(
&self,
peer_id: EndpointId,
) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
let conn = self
.endpoint
.connect(peer_id, FILE_TRANSFER_ALPN)
.await
.context("Failed to connect for file transfer")?;
let (send, recv) = conn.open_bi().await?;
Ok((send, recv))
}
/// Open a bi-directional stream to a specific peer for media.
pub async fn open_media_stream(
&self,
peer_id: EndpointId,
kind: protocol::MediaKind,
) -> Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
let alpn = match kind {
protocol::MediaKind::Voice => VOICE_ALPN,
protocol::MediaKind::Camera => CAMERA_ALPN,
protocol::MediaKind::Screen => SCREEN_ALPN,
};
let conn = self
.endpoint
.connect(peer_id, alpn)
.await
.context("Failed to connect for media stream")?;
let (send, recv) = conn.open_bi().await?;
Ok((send, recv))
}
/// Spawn a task that accepts incoming connections and dispatches them by ALPN.
fn spawn_acceptor(endpoint: Endpoint, gossip: Gossip, event_tx: mpsc::Sender<NetEvent>) {
tokio::spawn(async move {
while let Some(incoming) = endpoint.accept().await {
let gossip = gossip.clone();
let event_tx = event_tx.clone();
tokio::spawn(async move {
match incoming.await {
Ok(conn) => {
let alpn = conn.alpn().to_vec();
let peer_id = conn.remote_id();
if alpn == iroh_gossip::ALPN {
// Dispatch to gossip
if let Err(e) = gossip.handle_connection(conn).await {
tracing::warn!("Gossip failed to handle connection: {}", e);
}
} else if alpn == FILE_TRANSFER_ALPN || alpn == VOICE_ALPN || alpn == CAMERA_ALPN || alpn == SCREEN_ALPN {
// Handle application protocols with a dedicated loop
tracing::info!("Accepted connection from {} with ALPN {:?}", peer_id, String::from_utf8_lossy(&alpn));
if let Err(e) = Self::handle_app_connection(conn, alpn, event_tx).await {
tracing::warn!("Connection handler error for {}: {}", peer_id, e);
}
} else {
tracing::warn!("Ignored connection with unknown ALPN: {:?}", String::from_utf8_lossy(&alpn));
}
},
Err(e) => {
tracing::warn!("Failed to accept incoming connection: {}", e);
}
}
});
}
});
}
async fn handle_app_connection(conn: iroh::endpoint::Connection, alpn: Vec<u8>, event_tx: mpsc::Sender<NetEvent>) -> Result<()> {
let peer_id = conn.remote_id();
loop {
tokio::select! {
// Accept bi-directional streams (reliable)
res = conn.accept_bi() => {
match res {
Ok((send, recv)) => {
let event = if alpn == FILE_TRANSFER_ALPN {
NetEvent::IncomingFileStream {
from: peer_id,
send,
recv,
}
} else {
// Assume media
let kind = if alpn == VOICE_ALPN {
protocol::MediaKind::Voice
} else if alpn == CAMERA_ALPN {
protocol::MediaKind::Camera
} else {
protocol::MediaKind::Screen
};
NetEvent::IncomingMediaStream {
from: peer_id,
kind,
send,
recv,
}
};
if event_tx.send(event).await.is_err() {
break;
}
}
Err(e) => {
// Connection closed or error
tracing::debug!("Connection accept_bi closed for {}: {}", peer_id, e);
break;
}
}
}
// Accept datagrams (unreliable)
res = conn.read_datagram() => {
match res {
Ok(data) => {
// Dispatch datagram
if event_tx.send(NetEvent::IncomingDatagram {
from: peer_id,
data,
}).await.is_err() {
break;
}
}
Err(e) => {
tracing::debug!("Connection read_datagram closed for {}: {}", peer_id, e);
break;
}
}
}
}
}
Ok(())
}
/// Graceful shutdown.
pub async fn shutdown(self) -> Result<()> {
self.endpoint.close().await;
Ok(())
}
}