diff --git a/p2p-chat.log b/p2p-chat.log index d9bf10c..a11771f 100644 --- a/p2p-chat.log +++ b/p2p-chat.log @@ -1173,3 +1173,26 @@ 2026-02-11T18:53:38.115109Z DEBUG p2p_chat::net: Connection accept_bi closed for 8bc2400f07d26848e17871ac18a1bc6373ab4cbd9b3f4cc98b50065130214c7c: closed by peer: 0 2026-02-11T19:08:33.877479Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 2026-02-11T19:20:24.372206Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T16:44:21.680150Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T16:44:23.882875Z WARN reportgen-actor:captive-portal: iroh::net_report::reportgen: probe timed out +2026-02-12T16:44:26.681902Z WARN pkarr_publish{me=555e453dfe}: iroh::address_lookup::pkarr: Failed to publish to pkarr err=Error sending http request: error sending request for url (https://dns.iroh.link/pkarr/kixrkxx6dcabr7wq7zwowb84ecrbne8za5paex6juj84rw3ifaxy): client error (Connect): tls handshake eof url=https://dns.iroh.link/pkarr retry_after=1s failed_attempts=1 +2026-02-12T16:44:43.370562Z WARN iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T16:44:43.562583Z WARN gossip{me=555e453dfe}: iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T16:48:16.477304Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6970 +2026-02-12T16:51:09.019900Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:06:43.192887Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:08:39.721436Z WARN iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T17:15:39.729116Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:16:15.297203Z WARN iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T17:24:33.071652Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:24:35.273989Z WARN reportgen-actor:captive-portal: iroh::net_report::reportgen: probe timed out +2026-02-12T17:25:34.068745Z WARN iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T17:29:09.847897Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:29:31.328823Z WARN iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T17:29:31.736851Z WARN gossip{me=de5ce99405}: iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T17:29:50.668070Z WARN iroh_quinn_proto::connection: remote server configuration might cause nat traversal issues max_local_addresses=12 remote_cid_limit=5 +2026-02-12T17:31:18.144646Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:31:38.397227Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:32:51.046289Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:36:08.716291Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 +2026-02-12T17:40:14.794542Z INFO p2p_chat::web: Web interface listening on http://127.0.0.1:6969 diff --git a/src/app_logic.rs b/src/app_logic.rs index bcb7ff9..383c104 100644 --- a/src/app_logic.rs +++ b/src/app_logic.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use crate::chat::ChatState; use crate::config::AppConfig; use crate::file_transfer::FileTransferManager; @@ -6,6 +5,7 @@ use crate::media::MediaState; use crate::net::{NetEvent, NetworkManager}; use crate::protocol::{self, GossipMessage}; use crate::tui::TuiCommand; +use anyhow::Result; pub struct AppLogic { pub chat: ChatState, @@ -81,43 +81,47 @@ impl AppLogic { self_peer.name = Some(new_nick.clone()); } } - self.chat.add_system_message( - format!("Nickname changed: {} → {}", old, new_nick), - ); + self.chat + .add_system_message(format!("Nickname changed: {} → {}", old, new_nick)); // Broadcast name change to all peers - let msg = GossipMessage::NameChange( - protocol::NameChange { - old_name: old, - new_name: new_nick, - }, - ); + let msg = GossipMessage::NameChange(protocol::NameChange { + old_name: old, + new_name: new_nick, + }); let _ = self.net.broadcast(&msg).await; } TuiCommand::Connect(peer_id_str) => { match peer_id_str.parse::() { Ok(peer_id) => { - self.chat.add_system_message(format!("Connecting to {}...", peer_id)); + self.chat + .add_system_message(format!("Connecting to {}...", peer_id)); if let Err(e) = self.net.connect(peer_id).await { - self.chat.add_system_message(format!("Connection failed: {}", e)); + self.chat + .add_system_message(format!("Connection failed: {}", e)); } else { - self.chat.add_system_message("Connection initiated.".to_string()); + self.chat + .add_system_message("Connection initiated.".to_string()); } } Err(_) => { - self.chat.add_system_message(format!("Invalid peer ID: {}", peer_id_str)); + self.chat + .add_system_message(format!("Invalid peer ID: {}", peer_id_str)); } } } TuiCommand::SendFile(path) => { - self.chat.add_system_message(format!("Preparing to send file: {:?}", path)); + self.chat + .add_system_message(format!("Preparing to send file: {:?}", path)); if !path.exists() { - self.chat.add_system_message(format!("File not found: {}", path.display())); + self.chat + .add_system_message(format!("File not found: {}", path.display())); } else { let file_mgr = self.file_mgr.clone(); // Prepare send match file_mgr.prepare_send(&path).await { Ok((file_id, offer)) => { - self.chat.add_system_message(format!("Offering file: {}", offer.name)); + self.chat + .add_system_message(format!("Offering file: {}", offer.name)); let broadcast = protocol::FileOfferBroadcast { sender_name: self.our_name.to_string(), file_id, @@ -127,36 +131,45 @@ impl AppLogic { }; let msg = GossipMessage::FileOfferBroadcast(broadcast); if let Err(e) = self.net.broadcast(&msg).await { - self.chat.add_system_message(format!("Failed to broadcast offer: {}", e)); + self.chat.add_system_message(format!( + "Failed to broadcast offer: {}", + e + )); } } Err(e) => { - self.chat.add_system_message(format!("Failed to prepare file: {}", e)); + self.chat + .add_system_message(format!("Failed to prepare file: {}", e)); } } } } TuiCommand::Leave => { - self.chat.add_system_message("Leaving group chat...".to_string()); + self.chat + .add_system_message("Leaving group chat...".to_string()); self.media.shutdown(); // Clear peer list (except self) { let mut peers = self.net.peers.lock().await; peers.retain(|_, info| info.is_self); } - self.chat.add_system_message("Session ended. Use /connect to start a new session.".to_string()); + self.chat.add_system_message( + "Session ended. Use /connect to start a new session.".to_string(), + ); } TuiCommand::SelectMic(node_name) => { self.media.set_mic_name(Some(node_name.clone())); if self.media.voice_enabled() { - self.chat.add_system_message("Restarting voice with new mic...".to_string()); + self.chat + .add_system_message("Restarting voice with new mic...".to_string()); // Toggle off let _ = self.media.toggle_voice(self.net.clone()).await; // Toggle on let status = self.media.toggle_voice(self.net.clone()).await; self.chat.add_system_message(status.to_string()); } - self.chat.add_system_message(format!("🎤 Mic set to: {}", node_name)); + self.chat + .add_system_message(format!("🎤 Mic set to: {}", node_name)); // Save to config if let Ok(mut cfg) = AppConfig::load() { cfg.media.mic_name = Some(node_name); @@ -165,7 +178,8 @@ impl AppLogic { } TuiCommand::SetBitrate(bps) => { self.media.set_bitrate(bps); - self.chat.add_system_message(format!("🎵 Bitrate set to {} kbps", bps / 1000)); + self.chat + .add_system_message(format!("🎵 Bitrate set to {} kbps", bps / 1000)); // Save to config if let Ok(mut cfg) = AppConfig::load() { cfg.media.mic_bitrate = bps; @@ -174,7 +188,8 @@ impl AppLogic { } TuiCommand::SelectSpeaker(node_name) => { self.media.set_speaker_name(Some(node_name.clone())); - self.chat.add_system_message(format!("🔊 Speaker set to: {}", node_name)); + self.chat + .add_system_message(format!("🔊 Speaker set to: {}", node_name)); // Save to config if let Ok(mut cfg) = AppConfig::load() { cfg.media.speaker_name = Some(node_name); @@ -183,34 +198,58 @@ impl AppLogic { } } if self.media.voice_enabled() { - self.chat.add_system_message("Restart voice chat to apply changes.".to_string()); + self.chat + .add_system_message("Restart voice chat to apply changes.".to_string()); } } TuiCommand::AcceptFile(prefix) => { // Find matching transfer let transfers = self.file_mgr.transfers.lock().unwrap(); - let mut matched_id = None; - for (id, _info) in transfers.iter() { + let mut matched = None; + for (id, info) in transfers.iter() { let id_str = hex::encode(id); if id_str.starts_with(&prefix) { - if matched_id.is_some() { - self.chat.add_system_message(format!("Ambiguous prefix '{}'", prefix)); - matched_id = None; + if matched.is_some() { + self.chat + .add_system_message(format!("Ambiguous prefix '{}'", prefix)); + matched = None; break; } - matched_id = Some(*id); + matched = Some((*id, info.clone())); } } drop(transfers); - - if let Some(id) = matched_id { - if self.file_mgr.accept_transfer(id) { - self.chat.add_system_message(format!("Accepted transfer {}", hex::encode(id).chars().take(8).collect::())); + + if let Some((_id, info)) = matched { + if let Some(peer_id) = info.peer { + self.chat + .add_system_message(format!("Requesting file from {}...", peer_id)); + let req = protocol::FileRequest { + sender_name: self.our_name.clone(), + file_id: info.file_id, + }; + + // Update state to Requesting so we auto-accept when stream comes + let expires_at = std::time::Instant::now() + std::time::Duration::from_secs(60); + { + // We need to upgrade the lock or re-acquire? + // We dropped transfers lock at line 221. + let mut transfers = self.file_mgr.transfers.lock().unwrap(); + if let Some(t) = transfers.get_mut(&info.file_id) { + t.state = crate::file_transfer::TransferState::Requesting { expires_at }; + } + } + + let msg = GossipMessage::FileRequest(req); + let _ = self.net.broadcast(&msg).await; } else { - self.chat.add_system_message("Transfer not found or not waiting for accept.".to_string()); + self.chat.add_system_message( + "Cannot accept: Sender unknown (legacy offer?)".to_string(), + ); } } else { - self.chat.add_system_message(format!("No transfer found matching '{}'", prefix)); + self.chat + .add_system_message(format!("No transfer found matching '{}'", prefix)); } } TuiCommand::None => {} @@ -244,9 +283,78 @@ impl AppLogic { } GossipMessage::FileOfferBroadcast(offer) => { self.chat.add_system_message(format!( - "{} offers file: {} ({} bytes)", - offer.sender_name, offer.file_name, offer.file_size + "{} offers file: {} ({})", + offer.sender_name, + offer.file_name, + crate::file_transfer::format_bytes(offer.file_size) )); + self.file_mgr.register_incoming_broadcast(&offer, from); + } + GossipMessage::FileRequest(req) => { + let file_mgr = self.file_mgr.clone(); + let net = self.net.clone(); + let file_id = req.file_id; + let peer_id = from; + let sender_name = req.sender_name.clone(); + + // Check if we have transfer info + let transfer_opt = { + let transfers = file_mgr.transfers.lock().unwrap(); + transfers.get(&file_id).cloned() + }; + + if let Some(info) = transfer_opt { + if let (Some(path), Some(offer)) = (info.path, info.offer) { + self.chat.add_system_message(format!( + "Starting transfer of {} to {}", + info.file_name, sender_name + )); + + tokio::spawn(async move { + // Open connection + match net.open_file_stream(peer_id).await { + Ok((mut send, mut recv)) => { + if let Err(e) = file_mgr + .execute_send( + file_id, &path, offer, &mut send, &mut recv, + ) + .await + { + eprintln!("Transfer failed: {}", e); + let mut transfers = file_mgr.transfers.lock().unwrap(); + if let Some(t) = transfers.get_mut(&file_id) { + t.state = + crate::file_transfer::TransferState::Failed { + error: e.to_string(), + completed_at: std::time::Instant::now(), + }; + } + } + } + Err(e) => { + eprintln!( + "Failed to open connection to {}: {}", + peer_id, e + ); + let mut transfers = file_mgr.transfers.lock().unwrap(); + if let Some(t) = transfers.get_mut(&file_id) { + t.state = crate::file_transfer::TransferState::Failed { + error: format!("Conn error: {}", e), + completed_at: std::time::Instant::now(), + }; + } + } + } + }); + } else { + self.chat.add_system_message( + "Cannot send: Missing file path or offer details.".to_string(), + ); + } + } else { + self.chat + .add_system_message("Received request for unknown file.".to_string()); + } } GossipMessage::NameChange(change) => { let mut peers = self.net.peers.lock().await; @@ -259,31 +367,64 @@ impl AppLogic { )); } GossipMessage::Disconnect { sender_name } => { + let mut should_exit = false; { let mut peers = self.net.peers.lock().await; peers.remove(&from); + // Auto-terminate if no peers left + // We check if peers is empty. Note: we are not in peers map. + if peers.is_empty() { + should_exit = true; + } } let short_id: String = format!("{}", from).chars().take(8).collect(); self.chat.add_system_message(format!( "👋 {} ({}) disconnected", sender_name, short_id )); + + if should_exit && self.connected { + self.chat + .add_system_message("All peers disconnected. Exiting...".to_string()); + // Trigger shutdown + self.media.shutdown(); + std::process::exit(0); + } } }, NetEvent::PeerUp(peer_id) => { let short_id: String = format!("{}", peer_id).chars().take(8).collect(); - self.chat.add_system_message(format!("Peer connected: {}", short_id)); + self.chat + .add_system_message(format!("Peer connected: {}", short_id)); + self.connected = true; } NetEvent::PeerDown(peer_id) => { + let mut should_exit = false; + { + let mut peers = self.net.peers.lock().await; + peers.remove(&peer_id); + if peers.is_empty() { + should_exit = true; + } + } let short_id: String = format!("{}", peer_id).chars().take(8).collect(); - self.chat.add_system_message(format!("Peer disconnected: {}", short_id)); + self.chat + .add_system_message(format!("Peer disconnected: {}", short_id)); + + if should_exit && self.connected { + self.chat + .add_system_message("All peers disconnected. Exiting...".to_string()); + self.media.shutdown(); + std::process::exit(0); + } } NetEvent::IncomingFileStream { from, mut send, mut recv, } => { - self.chat.add_system_message("Incoming file transfer...".to_string()); + self.chat + .add_system_message("Incoming file transfer...".to_string()); tracing::info!("Incoming file stream from {:?}", from); let file_mgr = self.file_mgr.clone(); tokio::spawn(async move { @@ -299,7 +440,8 @@ impl AppLogic { recv, } => { let short_id: String = format!("{}", from).chars().take(8).collect(); - self.chat.add_system_message(format!("📡 Incoming {:?} stream from {}", kind, short_id)); + self.chat + .add_system_message(format!("📡 Incoming {:?} stream from {}", kind, short_id)); self.media.handle_incoming_media(from, kind, send, recv); } NetEvent::IncomingDatagram { from, data } => { diff --git a/src/chat/mod.rs b/src/chat/mod.rs index bd582af..126e75c 100644 --- a/src/chat/mod.rs +++ b/src/chat/mod.rs @@ -1,7 +1,7 @@ //! Chat module — manages chat history and message sending/receiving. -use crate::protocol::{ChatMessage, GossipMessage}; use crate::net::NetworkManager; +use crate::protocol::{ChatMessage, GossipMessage}; use anyhow::Result; /// Stored chat entry with display metadata. diff --git a/src/config.rs b/src/config.rs index fb26f1a..09a4082 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,9 +1,9 @@ -use std::path::PathBuf; -use std::fs; use anyhow::Result; use directories::ProjectDirs; -use serde::{Deserialize, Serialize}; use ratatui::style::Color; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::path::PathBuf; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppConfig { @@ -85,6 +85,14 @@ impl Default for MediaConfig { pub struct UiConfig { pub border: String, pub text: String, + #[serde(default = "default_cyan")] + pub chat_border: String, + #[serde(default = "default_cyan")] + pub peer_border: String, + #[serde(default = "default_cyan")] + pub input_border: String, + #[serde(default = "default_yellow")] + pub transfer_border: String, pub self_name: String, pub peer_name: String, pub system_msg: String, @@ -109,6 +117,10 @@ impl Default for UiConfig { peer_name: "magenta".to_string(), system_msg: "yellow".to_string(), time: "dark_gray".to_string(), + chat_border: "cyan".to_string(), + peer_border: "cyan".to_string(), + input_border: "cyan".to_string(), + transfer_border: "yellow".to_string(), success: "green".to_string(), error: "red".to_string(), warning: "yellow".to_string(), @@ -120,7 +132,7 @@ impl Default for UiConfig { impl AppConfig { pub fn load() -> Result { let config_path = Self::get_config_path(); - + if !config_path.exists() { // Create default config if it doesn't exist let default_config = Self::default(); @@ -198,7 +210,7 @@ pub fn parse_color(color_str: &str) -> Color { "white" => Color::White, _ => { // Try hex parsing if needed, but for now fallback to White - Color::White + Color::White } } } @@ -207,6 +219,10 @@ pub fn parse_color(color_str: &str) -> Color { #[derive(Debug, Clone)] pub struct Theme { pub border: Color, + pub chat_border: Color, + pub peer_border: Color, + pub input_border: Color, + pub transfer_border: Color, pub text: Color, pub self_name: Color, pub peer_name: Color, @@ -222,6 +238,10 @@ impl From for Theme { fn from(cfg: UiConfig) -> Self { Self { border: parse_color(&cfg.border), + chat_border: parse_color(&cfg.chat_border), + peer_border: parse_color(&cfg.peer_border), + input_border: parse_color(&cfg.input_border), + transfer_border: parse_color(&cfg.transfer_border), text: parse_color(&cfg.text), self_name: parse_color(&cfg.self_name), peer_name: parse_color(&cfg.peer_name), @@ -234,3 +254,10 @@ impl From for Theme { } } } + +fn default_yellow() -> String { + "yellow".to_string() +} +fn default_cyan() -> String { + "cyan".to_string() +} diff --git a/src/file_transfer/mod.rs b/src/file_transfer/mod.rs index 6afc296..b50baea 100644 --- a/src/file_transfer/mod.rs +++ b/src/file_transfer/mod.rs @@ -4,13 +4,14 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; -use sha2::{Sha256, Digest}; +use sha2::{Digest, Sha256}; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use crate::net::EndpointId; use crate::protocol::{ - decode_framed, write_framed, FileChunk, FileDone, FileId, FileOffer, - FileStreamMessage, FileAcceptReject, new_file_id, + decode_framed, new_file_id, write_framed, FileAcceptReject, FileChunk, FileDone, FileId, + FileOffer, FileStreamMessage, }; /// Chunk size for file transfers (64 KB). @@ -23,9 +24,9 @@ pub enum TransferState { /// We offered a file, waiting for opponent to accept. Offering, /// We received an offer, waiting for user to accept. - WaitingForAccept { - expires_at: std::time::Instant, - }, + WaitingForAccept { expires_at: std::time::Instant }, + /// We requested the file, waiting for sender to connect. + Requesting { expires_at: std::time::Instant }, /// Transfer is in progress. Transferring { bytes_transferred: u64, @@ -33,11 +34,11 @@ pub enum TransferState { start_time: std::time::Instant, }, /// Transfer completed successfully. - Complete, + Complete { completed_at: std::time::Instant }, /// Transfer was rejected by the peer. - Rejected, + Rejected { completed_at: std::time::Instant }, /// Transfer failed with an error. - Failed(String), + Failed { error: String, completed_at: std::time::Instant }, } /// Information about a tracked file transfer. @@ -49,6 +50,9 @@ pub struct TransferInfo { pub file_size: u64, pub state: TransferState, pub is_outgoing: bool, + pub peer: Option, + pub path: Option, + pub offer: Option, } use std::sync::{Arc, Mutex}; @@ -73,10 +77,7 @@ impl FileTransferManager { /// Initiate sending a file to a peer. /// Returns the file ID and the file offer for broadcasting. - pub async fn prepare_send( - &self, - file_path: &Path, - ) -> Result<(FileId, FileOffer)> { + pub async fn prepare_send(&self, file_path: &Path) -> Result<(FileId, FileOffer)> { let file_name = file_path .file_name() .context("No filename")? @@ -126,22 +127,66 @@ impl FileTransferManager { file_id, file_name, file_size, - state: TransferState::Offering, + state: TransferState::WaitingForAccept { + expires_at: std::time::Instant::now() + std::time::Duration::from_secs(timeout), + }, is_outgoing: true, + peer: None, + path: Some(file_path.to_path_buf()), + offer: Some(offer.clone()), }, ); } Ok((file_id, offer)) } - - // Add logic to signals - // We need a way to signal waiting tasks. - // Since `execute_receive` is async, we can use a Notify or Channel. - // For simplicity, let's store a map of channels in a separate mutex or the same one? - // Using the same Transfers map for simplicity. - // But TransferInfo is data, not channels. - // We'll add a separate map for channels. + + pub fn check_timeouts(&self) { + let mut transfers = self.transfers.lock().unwrap(); + let now = std::time::Instant::now(); + for info in transfers.values_mut() { + match info.state { + TransferState::WaitingForAccept { expires_at } | TransferState::Requesting { expires_at } => { + if now > expires_at { + info.state = TransferState::Failed { error: "Timed out".to_string(), completed_at: now }; + } + } + _ => {} + } + } + + // Remove expired + let to_remove: Vec = transfers.iter().filter_map(|(id, info)| { + match info.state { + TransferState::Complete { completed_at } | + TransferState::Rejected { completed_at } | + TransferState::Failed { completed_at, .. } => { + if now.duration_since(completed_at) > std::time::Duration::from_secs(10) { + Some(*id) + } else { + None + } + } + _ => None + } + }).collect(); + + for id in to_remove { + transfers.remove(&id); + } + + // Also cleanup pending accepts for timed out transfers? + // Actually, execute_receive handles its own timeout for pending_accepts channel. + // But for sender side, we don't have a pending accept channel causing a block? + // Sender is waiting in `execute_send`? + // Wait, execute_send sends Offer and waits for Accept/Reject. + // It uses `decode_framed(recv)`. + // If recipient never replies (timed out), sender is stuck in `decode_framed`. + // We need `execute_send` to timeout as well! + // But `check_timeouts` only updates the State in the Map. + // It doesn't interrupt the async `execute_send`. + // `execute_send` logic needs a timeout on `decode_framed`. + } pub fn accept_transfer(&self, file_id: FileId) -> bool { let mut pending = self.pending_accepts.lock().unwrap(); @@ -164,7 +209,6 @@ impl FileTransferManager { } } - /// Execute the sending side of a file transfer over a QUIC bi-stream. #[allow(dead_code)] pub async fn execute_send( @@ -187,7 +231,7 @@ impl FileTransferManager { FileStreamMessage::Reject(_) => { let mut transfers = self.transfers.lock().unwrap(); if let Some(info) = transfers.get_mut(&file_id) { - info.state = TransferState::Rejected; + info.state = TransferState::Rejected { completed_at: std::time::Instant::now() }; } return Ok(()); } @@ -237,7 +281,7 @@ impl FileTransferManager { { let mut transfers = self.transfers.lock().unwrap(); if let Some(info) = transfers.get_mut(&file_id) { - info.state = TransferState::Complete; + info.state = TransferState::Complete { completed_at: std::time::Instant::now() }; } } @@ -246,9 +290,7 @@ impl FileTransferManager { /// Handle an incoming file offer for display. #[allow(dead_code)] - /// Handle an incoming file offer for display. - #[allow(dead_code)] - pub fn register_incoming_offer(&self, offer: &FileOffer) { + pub fn register_incoming_offer(&self, offer: &FileOffer, peer: EndpointId) { let mut transfers = self.transfers.lock().unwrap(); transfers.insert( offer.file_id, @@ -258,6 +300,35 @@ impl FileTransferManager { file_size: offer.size, state: TransferState::Offering, is_outgoing: false, + peer: Some(peer), + path: None, + offer: None, + }, + ); + } + + /// Handle an incoming file offer broadcast for display. + pub fn register_incoming_broadcast( + &self, + offer: &crate::protocol::FileOfferBroadcast, + peer: EndpointId, + ) { + let mut transfers = self.transfers.lock().unwrap(); + // Calculate expires_at based on timeout + let expires_at = std::time::Instant::now() + std::time::Duration::from_secs(offer.timeout); + + transfers.insert( + offer.file_id, + TransferInfo { + file_id: offer.file_id, + file_name: offer.file_name.clone(), + file_size: offer.file_size, + // We go directly to WaitingForAccept because it's an offer we can accept + state: TransferState::WaitingForAccept { expires_at }, + is_outgoing: false, + peer: Some(peer), + path: None, + offer: None, }, ); } @@ -277,11 +348,21 @@ impl FileTransferManager { }; let file_id = offer.file_id; - + let timeout_duration = std::time::Duration::from_secs(offer.timeout); let expires_at = std::time::Instant::now() + timeout_duration; - // Register incoming offer + // Check if pre-accepted (Requesting state) + let is_pre_accepted = { + let transfers = self.transfers.lock().unwrap(); + if let Some(info) = transfers.get(&file_id) { + matches!(info.state, TransferState::Requesting { .. }) + } else { + false + } + }; + + // Register incoming offer (unless pre-accepted logic differs) { let mut transfers = self.transfers.lock().unwrap(); transfers.insert( @@ -292,28 +373,35 @@ impl FileTransferManager { file_size: offer.size, state: TransferState::WaitingForAccept { expires_at }, is_outgoing: false, + peer: None, // We don't need peer for receive technically, as we are connected + path: None, + offer: Some(offer.clone()), // Store incoming offer if useful }, ); } - // Wait for acceptance - let (tx, rx) = tokio::sync::oneshot::channel(); - { - let mut pending = self.pending_accepts.lock().unwrap(); - pending.insert(file_id, tx); - } + let accepted = if is_pre_accepted { + true + } else { + // Wait for acceptance + let (tx, rx) = tokio::sync::oneshot::channel(); + { + let mut pending = self.pending_accepts.lock().unwrap(); + pending.insert(file_id, tx); + } - // Wait for signal or timeout - let accepted = match tokio::time::timeout(timeout_duration, rx).await { - Ok(Ok(decision)) => decision, - Ok(Err(_)) => false, // Sender dropped? - Err(_) => { - // Timeout - false + // Wait for signal or timeout + match tokio::time::timeout(timeout_duration, rx).await { + Ok(Ok(decision)) => decision, + Ok(Err(_)) => false, // Sender dropped? + Err(_) => { + // Timeout + false + } } }; - // Remove from pending if still there (in case of timeout) + // Remove from pending if still there (in case of timeout or pre-accept bypass) { let mut pending = self.pending_accepts.lock().unwrap(); pending.remove(&file_id); @@ -325,8 +413,8 @@ impl FileTransferManager { let mut transfers = self.transfers.lock().unwrap(); // Check if it wasn't already updated (e.g. by manual reject) if let Some(info) = transfers.get_mut(&file_id) { - // Could be Offering or WaitingForAccept depending on race - info.state = TransferState::Rejected; + // Could be Offering or WaitingForAccept depending on race + info.state = TransferState::Rejected { completed_at: std::time::Instant::now() }; } } @@ -335,7 +423,7 @@ impl FileTransferManager { &FileStreamMessage::Reject(FileAcceptReject { file_id }), ) .await?; - + return Ok(file_id); } @@ -346,9 +434,21 @@ impl FileTransferManager { ) .await?; + // Update state to Transferring immediately to stop spinner + { + let mut transfers = self.transfers.lock().unwrap(); + if let Some(info) = transfers.get_mut(&file_id) { + info.state = TransferState::Transferring { + bytes_transferred: 0, + total_size: offer.size, + start_time: std::time::Instant::now(), + }; + } + } + // Receive chunks let dest_path = self.download_dir.join(&offer.name); - + // Ensure download dir exists for safety (though main normally does this) if let Some(parent) = dest_path.parent() { let _ = tokio::fs::create_dir_all(parent).await; @@ -396,7 +496,7 @@ impl FileTransferManager { { let mut transfers = self.transfers.lock().unwrap(); if let Some(info) = transfers.get_mut(&file_id) { - info.state = TransferState::Complete; + info.state = TransferState::Complete { completed_at: std::time::Instant::now() }; } } @@ -415,7 +515,10 @@ impl FileTransferManager { let direction = if info.is_outgoing { "↑" } else { "↓" }; match &info.state { TransferState::Offering => { - format!("{} {} (offering...)", direction, info.file_name) + format!("{} {} (Offering)", direction, info.file_name) + } + TransferState::Requesting { .. } => { + format!("{} {} (Requesting...)", direction, info.file_name) } TransferState::WaitingForAccept { expires_at } => { let now = std::time::Instant::now(); @@ -424,7 +527,14 @@ impl FileTransferManager { } else { 0 }; - format!("{} {} (WAITING ACCEPT: {}s)", direction, info.file_name, remaining) + if info.is_outgoing { + format!("{} {} (Wait Accept - {}s)", direction, info.file_name, remaining) + } else { + format!( + "{} {} (Incoming Offer - {}s)", + direction, info.file_name, remaining + ) + } } TransferState::Transferring { bytes_transferred, @@ -443,7 +553,7 @@ impl FileTransferManager { 0.0 }; let speed_mbps = speed_bps / (1024.0 * 1024.0); - + format!( "{} {} {}% ({}/{}) {:.1} MB/s", direction, @@ -454,21 +564,21 @@ impl FileTransferManager { speed_mbps ) } - TransferState::Complete => { + TransferState::Complete { .. } => { format!("{} {} ✓ complete", direction, info.file_name) } - TransferState::Rejected => { + TransferState::Rejected { .. } => { format!("{} {} ✗ rejected", direction, info.file_name) } - TransferState::Failed(e) => { - format!("{} {} ✗ {}", direction, info.file_name, e) + TransferState::Failed { error, .. } => { + format!("{} {} ✗ {}", direction, info.file_name, error) } } } } - #[allow(dead_code)] - fn format_bytes(bytes: u64) -> String { +#[allow(dead_code)] +pub fn format_bytes(bytes: u64) -> String { if bytes < 1024 { format!("{}B", bytes) } else if bytes < 1024 * 1024 { diff --git a/src/lib.rs b/src/lib.rs index 57c468a..0a74fa8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ //! P2P Chat — library crate exposing modules for integration tests. +pub mod app_logic; pub mod chat; pub mod config; pub mod file_transfer; @@ -8,4 +9,3 @@ pub mod net; pub mod protocol; pub mod tui; pub mod web; -pub mod app_logic; diff --git a/src/main.rs b/src/main.rs index 818ead6..77ad8d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ //! Chat is the primary feature, file transfer is first-class, //! voice/camera/screen are optional, powered by PipeWire. +mod app_logic; mod chat; mod config; mod file_transfer; @@ -12,7 +13,6 @@ mod net; mod protocol; mod tui; mod web; -mod app_logic; use std::io; use std::path::PathBuf; @@ -20,10 +20,10 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use clap::Parser; use crossterm::event::{Event, EventStream}; +use crossterm::execute; use crossterm::terminal::{ disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen, }; -use crossterm::execute; use iroh::EndpointId; use n0_future::StreamExt; use ratatui::backend::CrosstermBackend; @@ -95,23 +95,28 @@ async fn main() -> Result<()> { let cli = Cli::parse(); // Resolution: CLI > Config > Default - let res_str = cli.screen_resolution.as_deref().unwrap_or(&config.media.screen_resolution); + let res_str = cli + .screen_resolution + .as_deref() + .unwrap_or(&config.media.screen_resolution); let screen_res = parse_resolution(res_str).unwrap_or((1280, 720)); // Topic: CLI > Config > Default - let topic_str = cli.topic.as_deref().or(config.network.topic.as_deref()).unwrap_or("00000000000000000000000000000000"); + let topic_str = cli + .topic + .as_deref() + .or(config.network.topic.as_deref()) + .unwrap_or("00000000000000000000000000000000"); let topic_bytes = parse_topic(topic_str)?; // ... networking init ... // Initialize networking - let (mut net_mgr, _net_tx, mut net_rx) = - NetworkManager::new(topic_bytes).await.context("Failed to start networking")?; + let (mut net_mgr, _net_tx, mut net_rx) = NetworkManager::new(topic_bytes) + .await + .context("Failed to start networking")?; let our_id = net_mgr.our_id; - let our_id_short = format!("{}", our_id) - .chars() - .take(8) - .collect::(); + let our_id_short = format!("{}", our_id).chars().take(8).collect::(); // Initialize application state let mut chat = ChatState::new(cli.name.clone()); @@ -119,16 +124,16 @@ async fn main() -> Result<()> { // Resolve download directory, handling ~ let download_path = if cli.download_dir.starts_with("~/") { if let Ok(home) = std::env::var("HOME") { - PathBuf::from(home).join(&cli.download_dir[2..]) + PathBuf::from(home).join(&cli.download_dir[2..]) } else { - PathBuf::from(&cli.download_dir) + PathBuf::from(&cli.download_dir) } } else { PathBuf::from(&cli.download_dir) }; // Ensure download directory exists tokio::fs::create_dir_all(&download_path).await?; - + let file_mgr = FileTransferManager::new(download_path); // Pass mic name from config if present let media = MediaState::new( @@ -137,17 +142,16 @@ async fn main() -> Result<()> { config.media.speaker_name.clone(), config.media.mic_bitrate, ); - + // Initialize App with Theme let theme = crate::config::Theme::from(config.ui.clone()); let mut app = App::new(theme); - // If a peer was specified, add it and bootstrap let bootstrap_peers = if let Some(ref join_id) = cli.join { - let peer_id: EndpointId = join_id.parse().context( - "Invalid peer ID. Expected a hex-encoded endpoint ID.", - )?; + let peer_id: EndpointId = join_id + .parse() + .context("Invalid peer ID. Expected a hex-encoded endpoint ID.")?; vec![peer_id] } else { vec![] @@ -173,12 +177,15 @@ async fn main() -> Result<()> { // Insert ourselves into the peer list { let mut peers = net_mgr.peers.lock().await; - peers.insert(our_id, PeerInfo { - id: our_id, - name: Some(cli.name.clone()), - capabilities: None, - is_self: true, - }); + peers.insert( + our_id, + PeerInfo { + id: our_id, + name: Some(cli.name.clone()), + capabilities: None, + is_self: true, + }, + ); } // Broadcast capabilities @@ -254,6 +261,7 @@ async fn run_event_loop( gossip_rx: &mut mpsc::Receiver, ) -> Result<()> { let mut event_stream = EventStream::new(); + let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); loop { // Collect peers for rendering — self is always first @@ -281,6 +289,11 @@ async fn run_event_loop( // Wait for events tokio::select! { + // Tick for animation handling + _ = interval.tick() => { + logic.file_mgr.check_timeouts(); + } + // Terminal/keyboard events maybe_event = event_stream.next() => { match maybe_event { @@ -328,15 +341,13 @@ async fn run_event_loop( } } - - fn parse_topic(hex_str: &str) -> Result<[u8; 32]> { // If it's all zeros (default), generate a deterministic topic let hex_str = hex_str.trim(); if hex_str.len() != 32 && hex_str.len() != 64 { // Treat as a room name — hash it to get 32 bytes - use sha2::{Sha256, Digest}; + use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(hex_str.as_bytes()); let result = hasher.finalize(); @@ -355,7 +366,7 @@ fn parse_topic(hex_str: &str) -> Result<[u8; 32]> { Ok(bytes) } else { // 32-char string — treat as room name - use sha2::{Sha256, Digest}; + use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(hex_str.as_bytes()); let result = hasher.finalize(); diff --git a/src/media/capture.rs b/src/media/capture.rs index 67052ab..73c8588 100644 --- a/src/media/capture.rs +++ b/src/media/capture.rs @@ -4,12 +4,12 @@ //! Plays video by spawning `mpv` (or `vlc`) and writing to its stdin. use anyhow::Result; -use tracing; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use tracing; -use crate::protocol::{decode_framed, write_framed, MediaKind, MediaStreamMessage}; use crate::media::WebMediaEvent; +use crate::protocol::{decode_framed, write_framed, MediaKind, MediaStreamMessage}; /// Manages a video capture session (camera or screen). pub struct VideoCapture { @@ -17,8 +17,6 @@ pub struct VideoCapture { tasks: Vec>, } - - impl VideoCapture { /// Start video capture with web input (broadcast receiver). pub async fn start_web( @@ -29,7 +27,7 @@ impl VideoCapture { input_rx: tokio::sync::broadcast::Sender>, ) -> Result { let running = Arc::new(AtomicBool::new(true)); - + // Spawn sender tasks let mut tasks = Vec::new(); for peer in peers { @@ -37,10 +35,10 @@ impl VideoCapture { let net = network_manager.clone(); let rx = input_rx.subscribe(); let kind = kind.clone(); - + let task = tokio::spawn(async move { if let Err(e) = run_video_sender_web(net, peer, kind, rx, running).await { - tracing::error!("Video sender web error: {}", e); + tracing::error!("Video sender web error: {}", e); } }); tasks.push(task); @@ -60,10 +58,6 @@ impl VideoCapture { } } - - - - /// Handle incoming video stream from a peer (Web Version). /// Receives video frames (e.g. H.264/VP9 encoded inside protocol messages) and forwards to frontend. pub async fn handle_incoming_video_web( @@ -73,8 +67,8 @@ impl VideoCapture { broadcast_tx: tokio::sync::broadcast::Sender, ) -> Result<()> { let kind = match message { - MediaStreamMessage::VideoStart { kind, .. } => kind, - _ => anyhow::bail!("Expected VideoStart"), + MediaStreamMessage::VideoStart { kind, .. } => kind, + _ => anyhow::bail!("Expected VideoStart"), }; tracing::info!("Starting {:?} stream handler for {}", kind, from); @@ -88,7 +82,11 @@ impl VideoCapture { MediaStreamMessage::VideoFrame { data, .. } => { // Broadcast to web let short_id: String = format!("{}", from).chars().take(8).collect(); - let _ = broadcast_tx.send(WebMediaEvent::Video { peer_id: short_id, kind: kind.clone(), data }); + let _ = broadcast_tx.send(WebMediaEvent::Video { + peer_id: short_id, + kind: kind.clone(), + data, + }); } MediaStreamMessage::VideoStop { .. } => { tracing::info!("Peer stopped video"); @@ -118,25 +116,36 @@ async fn run_video_sender_web( mut input_rx: tokio::sync::broadcast::Receiver>, running: Arc, ) -> Result<()> { - let (mut send, _) = network_manager.open_media_stream(peer, kind.clone()).await?; + let (mut send, _) = network_manager + .open_media_stream(peer, kind.clone()) + .await?; // For web, we assume fixed resolution and fps for now. - write_framed(&mut send, &MediaStreamMessage::VideoStart { kind, width: 640, height: 480, fps: 30 }).await?; + write_framed( + &mut send, + &MediaStreamMessage::VideoStart { + kind, + width: 640, + height: 480, + fps: 30, + }, + ) + .await?; while running.load(Ordering::Relaxed) { match input_rx.recv().await { Ok(data) => { - // Web sends MJPEG chunk (full frame) - let msg = MediaStreamMessage::VideoFrame { - sequence: 0, // Sequence not used for web input, set to 0 - timestamp_ms: std::time::SystemTime::now() + // Web sends MJPEG chunk (full frame) + let msg = MediaStreamMessage::VideoFrame { + sequence: 0, // Sequence not used for web input, set to 0 + timestamp_ms: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64, - data, - }; - if write_framed(&mut send, &msg).await.is_err() { - break; - } + data, + }; + if write_framed(&mut send, &msg).await.is_err() { + break; + } } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { @@ -144,14 +153,12 @@ async fn run_video_sender_web( } } } - + let _ = write_framed(&mut send, &MediaStreamMessage::VideoStop { kind }).await; send.finish()?; Ok(()) } - - // --------------------------------------------------------------------------- // Player Logic (MPV/VLC) // --------------------------------------------------------------------------- diff --git a/src/media/mod.rs b/src/media/mod.rs index 20be841..a7dd16a 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -12,10 +12,10 @@ pub mod voice; use iroh::EndpointId; use tracing; -use std::sync::Arc; -use std::sync::atomic::{AtomicU32, Ordering}; use crate::net::NetworkManager; -use crate::protocol::{MediaKind, MediaStreamMessage, decode_framed}; +use crate::protocol::{decode_framed, MediaKind, MediaStreamMessage}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use self::capture::VideoCapture; use self::voice::VoiceChat; @@ -23,8 +23,15 @@ use self::voice::VoiceChat; /// Event sent to the web interface for playback. #[derive(Debug, Clone)] pub enum WebMediaEvent { - Audio { peer_id: String, data: Vec }, - Video { peer_id: String, kind: MediaKind, data: Vec }, + Audio { + peer_id: String, + data: Vec, + }, + Video { + peer_id: String, + kind: MediaKind, + data: Vec, + }, } /// Tracks all active media sessions. @@ -57,7 +64,12 @@ pub struct MediaState { } impl MediaState { - pub fn new(screen_resolution: (u32, u32), mic_name: Option, speaker_name: Option, mic_bitrate: u32) -> Self { + pub fn new( + screen_resolution: (u32, u32), + mic_name: Option, + speaker_name: Option, + mic_bitrate: u32, + ) -> Self { let pipewire_available = check_pipewire(); let (broadcast_tx, _) = tokio::sync::broadcast::channel(100); let (mic_broadcast, _) = tokio::sync::broadcast::channel(100); @@ -156,7 +168,7 @@ impl MediaState { /// Toggle camera capture. pub async fn toggle_camera(&mut self, net: NetworkManager) -> &'static str { // We use ffmpeg now, which doesn't strictly depend on pipewire crate, - // but likely requires pipewire daemon or v4l2. + // but likely requires pipewire daemon or v4l2. // We kept pipewire check for consistency but it might be loose. if self.camera.is_some() { if let Some(mut c) = self.camera.take() { @@ -172,15 +184,17 @@ impl MediaState { peers.keys().cloned().collect(), net.clone(), self.cam_broadcast.clone(), - ).await { - Ok(vc) => { - self.camera = Some(vc); - "📷 Camera started (Web)" - } - Err(e) => { - tracing::error!("Failed to start camera: {}", e); - "📷 Failed to start camera" - } + ) + .await + { + Ok(vc) => { + self.camera = Some(vc); + "📷 Camera started (Web)" + } + Err(e) => { + tracing::error!("Failed to start camera: {}", e); + "📷 Failed to start camera" + } } } } @@ -201,15 +215,17 @@ impl MediaState { peers.keys().cloned().collect(), net.clone(), self.screen_broadcast.clone(), - ).await { - Ok(vc) => { - self.screen = Some(vc); - "🖥️ Screen share started (Web)" - } - Err(e) => { - tracing::error!("Failed to start screen share: {}", e); - "🖥️ Failed to start screen share" - } + ) + .await + { + Ok(vc) => { + self.screen = Some(vc); + "🖥️ Screen share started (Web)" + } + Err(e) => { + tracing::error!("Failed to start screen share: {}", e); + "🖥️ Failed to start screen share" + } } } } @@ -238,22 +254,36 @@ impl MediaState { tracing::warn!("ALPN mismatch: expected Voice, got AudioStart"); } tracing::info!("Accepted Audio stream from {:?}", from); - if let Err(e) = VoiceChat::handle_incoming_audio_web(from, msg, recv, broadcast_tx).await { + if let Err(e) = + VoiceChat::handle_incoming_audio_web(from, msg, recv, broadcast_tx) + .await + { tracing::error!("Audio web playback error: {}", e); } } MediaStreamMessage::VideoStart { .. } => { tracing::info!("Accepted Video stream from {:?}", from); - if let Err(e) = VideoCapture::handle_incoming_video_web(from, msg, recv, broadcast_tx).await { + if let Err(e) = + VideoCapture::handle_incoming_video_web(from, msg, recv, broadcast_tx) + .await + { tracing::error!("Video web playback error: {}", e); } } _ => { - tracing::warn!("Unknown or unexpected start message from {:?}: {:?}", from, msg); + tracing::warn!( + "Unknown or unexpected start message from {:?}: {:?}", + from, + msg + ); } }, Err(e) => { - tracing::warn!("Failed to decode initial media message from {:?}: {}", from, e); + tracing::warn!( + "Failed to decode initial media message from {:?}: {}", + from, + e + ); } } }); @@ -271,11 +301,11 @@ impl MediaState { // Or we should add a prefix byte? // Since we are optimizing audio, let's assume it's audio. // But if we add video datagrams later... - + // For now, let's try to interpret as audio. // Since VoiceChat expects `MediaStreamMessage`, we need to see how `postcard` serialized it. // If sender sends serialized `AudioData`, we can deserialize it. - + if let Some(voice) = &mut self.voice { voice.handle_datagram(from, data, self.broadcast_tx.clone()); } diff --git a/src/media/voice.rs b/src/media/voice.rs index 7004900..eee18ad 100644 --- a/src/media/voice.rs +++ b/src/media/voice.rs @@ -10,12 +10,15 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::thread; -use anyhow::Result; -use crate::protocol::{decode_framed, MediaStreamMessage}; use crate::media::WebMediaEvent; +use crate::protocol::{decode_framed, MediaStreamMessage}; +use anyhow::Result; use postcard; // Use audiopus types directly -use audiopus::{coder::Encoder as OpusEncoder, coder::Decoder as OpusDecoder, Channels, Application, SampleRate, Bitrate}; +use audiopus::{ + coder::Decoder as OpusDecoder, coder::Encoder as OpusEncoder, Application, Bitrate, Channels, + SampleRate, +}; // Constants const SAMPLE_RATE_VAL: i32 = 48000; @@ -43,52 +46,62 @@ pub fn list_audio_sinks() -> Vec { fn list_audio_nodes(filter_class: &str) -> Vec { use std::process::Command; - + let output = match Command::new("pw-dump").output() { Ok(o) => o, Err(_) => return Vec::new(), }; - + if !output.status.success() { return Vec::new(); } - + let json_str = match String::from_utf8(output.stdout) { Ok(s) => s, Err(_) => return Vec::new(), }; - + let data: Vec = match serde_json::from_str(&json_str) { Ok(d) => d, Err(_) => return Vec::new(), }; - + let mut sources = Vec::new(); for obj in &data { let props = match obj.get("info").and_then(|i| i.get("props")) { Some(p) => p, None => continue, }; - - let media_class = props.get("media.class").and_then(|v| v.as_str()).unwrap_or(""); + + let media_class = props + .get("media.class") + .and_then(|v| v.as_str()) + .unwrap_or(""); // Match partial class, e.g. "Audio/Source", "Audio/Sink", "Audio/Duplex" if !media_class.contains(filter_class) && !media_class.contains("Audio/Duplex") { continue; } - - let node_name = props.get("node.name").and_then(|v| v.as_str()).unwrap_or("").to_string(); + + let node_name = props + .get("node.name") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); let description = props .get("node.description") .or_else(|| props.get("node.nick")) .and_then(|v| v.as_str()) .unwrap_or(&node_name) .to_string(); - + if !node_name.is_empty() { - sources.push(AudioDevice { node_name, description }); + sources.push(AudioDevice { + node_name, + description, + }); } } - + sources } @@ -112,25 +125,23 @@ impl VoiceChat { mic_bitrate: Arc, ) -> Result { let running = Arc::new(AtomicBool::new(true)); - + let mut tasks = Vec::new(); - // One sender task to iterate ALL peers? Or one task per peer? - // Since input is broadcast channel, spawning multiple tasks (one per peer) works well. - // Each task gets a copy of `mic_rx`. Wait, `broadcast::Receiver` can be cloned? - // No, `mic_rx` must be resubscribed? - // `broadcast::Receiver` is `Clone`? No, `Sender::subscribe` returns receiver. - // But we are passing `Receiver` in. - // We can't clone a receiver to get same stream easily without `resubscribe`. - // But `resubscribe` needs sender. - // We should probably spawn ONE task that reads from `mic_rx` and sends to ALL peers. - // That is more efficient (encode once, send N times). - + // Spawn a single task to encode once and send to all peers. let sender_running = running.clone(); let net_clone = net.clone(); let mic_bitrate_clone = mic_bitrate.clone(); let sender_task = tokio::spawn(async move { - if let Err(e) = run_opis_sender_web_multi(net_clone, peers, mic_rx, sender_running, mic_bitrate_clone).await { + if let Err(e) = run_opis_sender_web_multi( + net_clone, + peers, + mic_rx, + sender_running, + mic_bitrate_clone, + ) + .await + { tracing::error!("Voice sender failed: {}", e); } }); @@ -171,10 +182,10 @@ impl VoiceChat { // Process start message match message { - MediaStreamMessage::AudioStart { .. } => { + MediaStreamMessage::AudioStart { .. } => { tracing::info!("Incoming voice stream started (web) from {}", from); - } - _ => anyhow::bail!("Expected AudioStart"), + } + _ => anyhow::bail!("Expected AudioStart"), } let mut decode_buf = vec![0f32; FRAME_SIZE_SAMPLES]; @@ -186,13 +197,17 @@ impl VoiceChat { }; match msg { - MediaStreamMessage::AudioData { opus_data, .. } => { // Removed `channels` field usage if it existed + MediaStreamMessage::AudioData { opus_data, .. } => { + // Removed `channels` field usage if it existed match decoder.decode_float(Some(&opus_data), &mut decode_buf, false) { Ok(len) => { - let samples = decode_buf[..len].to_vec(); - // Broadcast to web - let short_id: String = format!("{}", from).chars().take(8).collect(); - let _ = broadcast_tx.send(WebMediaEvent::Audio { peer_id: short_id, data: samples }); + let samples = decode_buf[..len].to_vec(); + // Broadcast to web + let short_id: String = format!("{}", from).chars().take(8).collect(); + let _ = broadcast_tx.send(WebMediaEvent::Audio { + peer_id: short_id, + data: samples, + }); } Err(e) => { tracing::warn!("Opus decode error: {:?}", e); @@ -210,33 +225,40 @@ impl VoiceChat { } pub fn handle_datagram( - &mut self, - from: iroh::EndpointId, - data: bytes::Bytes, - broadcast_tx: tokio::sync::broadcast::Sender + &mut self, + from: iroh::EndpointId, + data: bytes::Bytes, + broadcast_tx: tokio::sync::broadcast::Sender, ) { // tracing::info!("Received datagram from {} ({} bytes)", from, data.len()); match postcard::from_bytes::(&data) { - Ok(MediaStreamMessage::AudioData { opus_data, sequence }) => { - if sequence % 50 == 0 { - tracing::info!("Received AudioData seq {} from {}", sequence, from); - } - let decoder = self.datagram_decoders.entry(from).or_insert_with(|| { - tracing::info!("Creating new OpusDecoder for {}", from); - OpusDecoder::new(SampleRate::Hz48000, Channels::Mono).expect("Failed to create decoder") - }); - // Max frame size is ~120ms (5760 samples). Use safe buffer. - let mut pcm = vec![0.0f32; 5760]; - match decoder.decode_float(Some(&opus_data), &mut pcm, false) { - Ok(len) => { - let samples = pcm[..len].to_vec(); - let short_id: String = format!("{}", from).chars().take(8).collect(); - let _ = broadcast_tx.send(WebMediaEvent::Audio { peer_id: short_id, data: samples }); - } - Err(e) => tracing::warn!("Opus decode error: {:?}", e), - } - }, - Ok(_) => {}, // Ignore non-audio datagrams + Ok(MediaStreamMessage::AudioData { + opus_data, + sequence, + }) => { + if sequence % 50 == 0 { + tracing::info!("Received AudioData seq {} from {}", sequence, from); + } + let decoder = self.datagram_decoders.entry(from).or_insert_with(|| { + tracing::info!("Creating new OpusDecoder for {}", from); + OpusDecoder::new(SampleRate::Hz48000, Channels::Mono) + .expect("Failed to create decoder") + }); + // Max frame size is ~120ms (5760 samples). Use safe buffer. + let mut pcm = vec![0.0f32; 5760]; + match decoder.decode_float(Some(&opus_data), &mut pcm, false) { + Ok(len) => { + let samples = pcm[..len].to_vec(); + let short_id: String = format!("{}", from).chars().take(8).collect(); + let _ = broadcast_tx.send(WebMediaEvent::Audio { + peer_id: short_id, + data: samples, + }); + } + Err(e) => tracing::warn!("Opus decode error: {:?}", e), + } + } + Ok(_) => {} // Ignore non-audio datagrams Err(e) => tracing::warn!("Failed to deserialize datagram: {}", e), } } @@ -261,34 +283,33 @@ async fn run_opis_sender_web_multi( let mut connections = Vec::new(); for peer in peers { // We use VOICE_ALPN, but for datagrams ALPN matters for connection establishment. - // If we already have a connection (e.g. gossip), iroh reuses it? - // Yes, connect() returns the existing connection if active. - match network_manager.endpoint.connect(peer, crate::net::VOICE_ALPN).await { + + match network_manager + .endpoint + .connect(peer, crate::net::VOICE_ALPN) + .await + { Ok(conn) => { connections.push(conn); } Err(e) => { - tracing::warn!("Failed to connect to {}: {}", peer, e); + tracing::warn!("Failed to connect to {}: {}", peer, e); } } } - + if connections.is_empty() { tracing::warn!("No reachable peers for voice chat"); - // We continue anyway? No, useless. - // But maybe peers come online later? - // For now, fail if no initial connection. - // Or wait? The loop only runs while running is true. - // Let's just return to avoid spinning. return Ok(()); } let mut encoder = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip) .map_err(|e| anyhow::anyhow!("Failed to create Opus encoder: {:?}", e))?; - + // Set initial bitrate let current_bitrate = mic_bitrate.load(Ordering::Relaxed); - encoder.set_bitrate(Bitrate::BitsPerSecond(current_bitrate as i32)) + encoder + .set_bitrate(Bitrate::BitsPerSecond(current_bitrate as i32)) .map_err(|e| anyhow::anyhow!("Failed to set bitrate: {:?}", e))?; // Opus frame size: 20ms at 48kHz = 960 samples @@ -307,11 +328,11 @@ async fn run_opis_sender_web_multi( Ok(samples) => { // tracing::trace!("Received {} audio samples from web", samples.len()); pcm_buffer.extend_from_slice(&samples); - + // Process 20ms chunks while pcm_buffer.len() >= frame_size { let chunk: Vec = pcm_buffer.drain(0..frame_size).collect(); - + match encoder.encode_float(&chunk, &mut opus_buffer) { Ok(len) => { let packet = opus_buffer[..len].to_vec(); @@ -320,7 +341,7 @@ async fn run_opis_sender_web_multi( opus_data: packet, }; sequence = sequence.wrapping_add(1); - + // Serialize for Datagram match postcard::to_allocvec(&msg) { Ok(data) => { @@ -334,7 +355,11 @@ async fn run_opis_sender_web_multi( } } if sent_count > 0 && sequence % 50 == 0 { - tracing::info!("Sent audio datagram seq {} to {} peers", sequence, sent_count); + tracing::info!( + "Sent audio datagram seq {} to {} peers", + sequence, + sent_count + ); } } Err(e) => tracing::error!("Serialization error: {}", e), @@ -352,6 +377,6 @@ async fn run_opis_sender_web_multi( } } } - + Ok(()) } diff --git a/src/net/mod.rs b/src/net/mod.rs index ce0ee23..938f558 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -109,7 +109,7 @@ impl NetworkManager { tokio::spawn(async move { while let Some(_event) = receiver.next().await { // Drain events to keep subscription active. - // We ignore them because the main subscription loop triggers + // We ignore them because the main subscription loop triggers // the actual application logic (NetEvent). } }); @@ -118,7 +118,9 @@ impl NetworkManager { } /// Create a new NetworkManager and start the iroh endpoint. - pub async fn new(topic_bytes: [u8; 32]) -> Result<(Self, mpsc::Sender, mpsc::Receiver)> { + pub async fn new( + topic_bytes: [u8; 32], + ) -> Result<(Self, mpsc::Sender, mpsc::Receiver)> { let (event_tx, event_rx) = mpsc::channel(256); // Create endpoint with file transfer ALPN and Gossip ALPN @@ -281,7 +283,7 @@ impl NetworkManager { while let Some(incoming) = endpoint.accept().await { let gossip = gossip.clone(); let event_tx = event_tx.clone(); - + tokio::spawn(async move { match incoming.await { Ok(conn) => { @@ -293,16 +295,33 @@ impl NetworkManager { if let Err(e) = gossip.handle_connection(conn).await { tracing::warn!("Gossip failed to handle connection: {}", e); } - } else if alpn == FILE_TRANSFER_ALPN || alpn == VOICE_ALPN || alpn == CAMERA_ALPN || alpn == SCREEN_ALPN { + } else if alpn == FILE_TRANSFER_ALPN + || alpn == VOICE_ALPN + || alpn == CAMERA_ALPN + || alpn == SCREEN_ALPN + { // Handle application protocols with a dedicated loop - tracing::info!("Accepted connection from {} with ALPN {:?}", peer_id, String::from_utf8_lossy(&alpn)); - if let Err(e) = Self::handle_app_connection(conn, alpn, event_tx).await { - tracing::warn!("Connection handler error for {}: {}", peer_id, e); + tracing::info!( + "Accepted connection from {} with ALPN {:?}", + peer_id, + String::from_utf8_lossy(&alpn) + ); + if let Err(e) = + Self::handle_app_connection(conn, alpn, event_tx).await + { + tracing::warn!( + "Connection handler error for {}: {}", + peer_id, + e + ); } } else { - tracing::warn!("Ignored connection with unknown ALPN: {:?}", String::from_utf8_lossy(&alpn)); + tracing::warn!( + "Ignored connection with unknown ALPN: {:?}", + String::from_utf8_lossy(&alpn) + ); } - }, + } Err(e) => { tracing::warn!("Failed to accept incoming connection: {}", e); } @@ -312,9 +331,13 @@ impl NetworkManager { }); } - async fn handle_app_connection(conn: iroh::endpoint::Connection, alpn: Vec, event_tx: mpsc::Sender) -> Result<()> { + async fn handle_app_connection( + conn: iroh::endpoint::Connection, + alpn: Vec, + event_tx: mpsc::Sender, + ) -> Result<()> { let peer_id = conn.remote_id(); - + loop { tokio::select! { // Accept bi-directional streams (reliable) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index f5b64f7..8ee4729 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -25,9 +25,20 @@ pub enum GossipMessage { Capabilities(CapabilitiesMessage), PeerAnnounce(PeerAnnounce), FileOfferBroadcast(FileOfferBroadcast), + /// Request the file transfer (sent by recipient after accepting). + FileRequest(FileRequest), NameChange(NameChange), /// Graceful disconnect notification. - Disconnect { sender_name: String }, + Disconnect { + sender_name: String, + }, +} + +/// Request to start a file transfer. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileRequest { + pub sender_name: String, + pub file_id: FileId, } /// A name change notification from a peer. @@ -161,10 +172,7 @@ pub enum MediaStreamMessage { frame_size_ms: u8, }, /// One Opus-encoded audio frame. - AudioData { - sequence: u64, - opus_data: Vec, - }, + AudioData { sequence: u64, opus_data: Vec }, /// Audio session ended. AudioStop, @@ -182,9 +190,7 @@ pub enum MediaStreamMessage { data: Vec, }, /// Video session ended. - VideoStop { - kind: MediaKind, - }, + VideoStop { kind: MediaKind }, } // --------------------------------------------------------------------------- diff --git a/src/tui/chat_panel.rs b/src/tui/chat_panel.rs index 3ea2717..44214e6 100644 --- a/src/tui/chat_panel.rs +++ b/src/tui/chat_panel.rs @@ -14,7 +14,7 @@ pub fn render(frame: &mut Frame, area: Rect, chat: &ChatState, app: &App) { let block = Block::default() .title(" 💬 Chat ") .borders(Borders::ALL) - .border_style(Style::default().fg(app.theme.border)); + .border_style(Style::default().fg(app.theme.chat_border)); let inner = block.inner(area); frame.render_widget(block, area); @@ -48,17 +48,14 @@ pub fn render(frame: &mut Frame, area: Rect, chat: &ChatState, app: &App) { fn format_entry(entry: &ChatEntry, app: &App) -> ListItem<'static> { let time = chrono::DateTime::from_timestamp_millis(entry.timestamp as i64) - .map(|dt| dt.format("%H:%M").to_string()) + .map(|dt| dt.with_timezone(&chrono::Local).format("%H:%M").to_string()) .unwrap_or_default(); if entry.is_system { let line = Line::from(vec![ + Span::styled(format!("[{}] ", time), Style::default().fg(app.theme.time)), Span::styled( - format!("[{}] ", time), - Style::default().fg(app.theme.time), - ), - Span::styled( - format!("*** {} ***", entry.text), + format!("[sys] {}", entry.text), Style::default() .fg(app.theme.system_msg) .add_modifier(Modifier::ITALIC), @@ -70,16 +67,14 @@ fn format_entry(entry: &ChatEntry, app: &App) -> ListItem<'static> { let name_color = if entry.is_self { app.theme.self_name } else { - app.theme.peer_name + crate::tui::get_peer_color(&entry.sender_name) }; let line = Line::from(vec![ Span::styled(format!("[{}] ", time), Style::default().fg(app.theme.time)), Span::styled( format!("{}: ", entry.sender_name), - Style::default() - .fg(name_color) - .add_modifier(Modifier::BOLD), + Style::default().fg(name_color).add_modifier(Modifier::BOLD), ), Span::styled(entry.text.clone(), Style::default().fg(app.theme.text)), ]); diff --git a/src/tui/file_panel.rs b/src/tui/file_panel.rs index 6a03657..210044b 100644 --- a/src/tui/file_panel.rs +++ b/src/tui/file_panel.rs @@ -15,7 +15,7 @@ pub fn render(frame: &mut Frame, area: Rect, file_mgr: &FileTransferManager, app let block = Block::default() .title(format!(" 📦 Transfers ({}) ", transfers.len())) .borders(Borders::ALL) - .border_style(Style::default().fg(app.theme.border)); + .border_style(Style::default().fg(app.theme.transfer_border)); let inner = block.inner(area); frame.render_widget(block, area); @@ -30,29 +30,36 @@ pub fn render(frame: &mut Frame, area: Rect, file_mgr: &FileTransferManager, app let items: Vec = transfers .iter() .map(|info| { - let id_short = hex::encode(info.file_id).chars().take(4).collect::(); - + let id_short = hex::encode(info.file_id) + .chars() + .take(4) + .collect::(); + // Format state with specific styling let (text, color) = match &info.state { - TransferState::Complete => ( + TransferState::Complete { .. } => ( format!("[{}] {} (Complete)", id_short, info.file_name), - app.theme.success + app.theme.success, ), - TransferState::Rejected => ( + TransferState::Rejected { .. } => ( format!("[{}] {} (Rejected)", id_short, info.file_name), - app.theme.error + app.theme.error, ), - TransferState::Failed(e) => ( - format!("[{}] {} (Failed: {})", id_short, info.file_name, e), - app.theme.error + TransferState::Failed { error, .. } => ( + format!("[{}] {} (Failed: {})", id_short, info.file_name, error), + app.theme.error, ), - TransferState::Transferring { bytes_transferred, total_size, start_time } => { + TransferState::Transferring { + bytes_transferred, + total_size, + start_time, + } => { let pct = if *total_size > 0 { (*bytes_transferred as f64 / *total_size as f64) * 100.0 } else { 0.0 }; - + // Calc speed & ETA let elapsed = start_time.elapsed().as_secs_f64(); let speed_bps = if elapsed > 0.0 { @@ -61,7 +68,7 @@ pub fn render(frame: &mut Frame, area: Rect, file_mgr: &FileTransferManager, app 0.0 }; let speed_mbps = speed_bps / (1024.0 * 1024.0); - + let eta_str = if speed_bps > 0.0 && *total_size > *bytes_transferred { let remaining_bytes = total_size - bytes_transferred; let eta_secs = remaining_bytes as f64 / speed_bps; @@ -73,46 +80,73 @@ pub fn render(frame: &mut Frame, area: Rect, file_mgr: &FileTransferManager, app // Progress Bar let width = 15; let filled = (pct / 100.0 * width as f64) as usize; - let bar: String = (0..width).map(|i| if i < filled { '█' } else { '░' }).collect(); + let bar: String = (0..width) + .map(|i| if i < filled { '█' } else { '░' }) + .collect(); ( - format!("[{}] {} ⏳ {} {:.1}% ({:.1} MB/s) - {}", - id_short, info.file_name, bar, pct, speed_mbps, eta_str), - app.theme.info + format!( + "[{}] {} ⏳ {} {:.1}% ({:.1} MB/s) - {}", + id_short, info.file_name, bar, pct, speed_mbps, eta_str + ), + // Use warning color (yellow) as requested + app.theme.warning, ) - }, + } TransferState::Offering => ( format!("[{}] {} (Offering...)", id_short, info.file_name), - app.theme.warning + app.theme.warning, ), TransferState::WaitingForAccept { expires_at } => { let now = std::time::Instant::now(); if *expires_at > now { let remaining = expires_at.duration_since(now).as_secs(); - + // spinner (braille) - const SPINNER: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; + const SPINNER: &[&str] = + &["⠟","⠯", "⠷", "⠾", "⠽", "⠻"]; let millis = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let spin_idx = (millis / 100) as usize % SPINNER.len(); let spinner = SPINNER[spin_idx]; - + // Removed progress bar as requested ("leave the braile and seconds") ( - format!("[{}] {} {} {}s left", id_short, info.file_name, spinner, remaining), - app.theme.warning + format!( + "[{}] {} {} {}s left", + id_short, info.file_name, spinner, remaining + ), + app.theme.warning, ) } else { ( format!("[{}] {} (Timed out)", id_short, info.file_name), - app.theme.error + app.theme.error, + ) + } + } + TransferState::Requesting { expires_at } => { + let now = std::time::Instant::now(); + if *expires_at > now { + let remaining = expires_at.duration_since(now).as_secs(); + ( + format!( + "[{}] {} (Requesting... {}s)", + id_short, info.file_name, remaining + ), + app.theme.warning, + ) + } else { + ( + format!("[{}] {} (Timed out)", id_short, info.file_name), + app.theme.error, ) } } }; - + ListItem::new(Line::from(Span::styled(text, Style::default().fg(color)))) }) .collect(); diff --git a/src/tui/input.rs b/src/tui/input.rs index 6306e2f..720d2c6 100644 --- a/src/tui/input.rs +++ b/src/tui/input.rs @@ -14,22 +14,18 @@ pub fn render(frame: &mut Frame, area: Rect, app: &App) { app.theme.system_msg, app.file_path_input.as_str(), ), - InputMode::Editing => (" ✏ Message (Esc for commands) ", app.theme.self_name, app.input.as_str()), + InputMode::Editing => ( + " ✏ Message (Esc for commands) ", + app.theme.self_name, + app.input.as_str(), + ), InputMode::Normal => ( " Press i/Enter to type, Q=quit, or use /help ", app.theme.time, app.input.as_str(), ), - InputMode::MicSelect => ( - " 🎤 Selecting microphone... ", - app.theme.border, - "", - ), - InputMode::SpeakerSelect => ( - " 🔊 Selecting speaker... ", - app.theme.border, - "", - ), + InputMode::MicSelect => (" 🎤 Selecting microphone... ", app.theme.input_border, ""), + InputMode::SpeakerSelect => (" 🔊 Selecting speaker... ", app.theme.input_border, ""), }; let block = Block::default() @@ -46,16 +42,10 @@ pub fn render(frame: &mut Frame, area: Rect, app: &App) { // Set cursor position when in editing/file mode match app.input_mode { InputMode::Editing => { - frame.set_cursor_position(( - area.x + 1 + app.cursor_position as u16, - area.y + 1, - )); + frame.set_cursor_position((area.x + 1 + app.cursor_position as u16, area.y + 1)); } InputMode::FilePrompt => { - frame.set_cursor_position(( - area.x + 1 + app.file_path_input.len() as u16, - area.y + 1, - )); + frame.set_cursor_position((area.x + 1 + app.file_path_input.len() as u16, area.y + 1)); } InputMode::Normal => {} InputMode::MicSelect => {} diff --git a/src/tui/mod.rs b/src/tui/mod.rs index 7d09ebb..093453e 100644 --- a/src/tui/mod.rs +++ b/src/tui/mod.rs @@ -1,10 +1,10 @@ //! TUI module — terminal user interface using ratatui + crossterm. pub mod chat_panel; -pub mod peer_panel; -pub mod input; -pub mod status_bar; pub mod file_panel; +pub mod input; +pub mod peer_panel; +pub mod status_bar; use std::path::PathBuf; @@ -17,8 +17,8 @@ use ratatui::Frame; use crate::chat::ChatState; use crate::file_transfer::FileTransferManager; -use crate::media::MediaState; use crate::media::voice::AudioDevice; +use crate::media::MediaState; use crate::net::PeerInfo; /// Input mode for the TUI. @@ -45,7 +45,7 @@ pub enum TuiCommand { ToggleVoice, ToggleCamera, ToggleScreen, - SelectMic(String), // node_name of selected mic + SelectMic(String), // node_name of selected mic SelectSpeaker(String), // node_name of selected speaker SetBitrate(u32), Leave, @@ -146,7 +146,8 @@ impl App { let sources = crate::media::voice::list_audio_sources(); if sources.is_empty() { return TuiCommand::SystemMessage( - "🎤 No audio sources found (is PipeWire running?)".to_string(), + "🎤 No audio sources found (is PipeWire running?)" + .to_string(), ); } self.open_mic_select(sources); @@ -157,7 +158,8 @@ impl App { let sinks = crate::media::voice::list_audio_sinks(); if sinks.is_empty() { return TuiCommand::SystemMessage( - "🔊 No audio outputs found (is PipeWire running?)".to_string(), + "🔊 No audio outputs found (is PipeWire running?)" + .to_string(), ); } self.open_speaker_select(sinks); @@ -190,7 +192,8 @@ impl App { } _ => { return TuiCommand::SystemMessage( - "No file dialog available. Use: /file ".to_string(), + "No file dialog available. Use: /file " + .to_string(), ); } } @@ -200,7 +203,9 @@ impl App { "accept" | "a" => { let id_prefix = parts.get(1).unwrap_or(&"").trim(); if id_prefix.is_empty() { - return TuiCommand::SystemMessage("Usage: /accept ".to_string()); + return TuiCommand::SystemMessage( + "Usage: /accept ".to_string(), + ); } return TuiCommand::AcceptFile(id_prefix.to_string()); } @@ -208,7 +213,7 @@ impl App { "leave" => return TuiCommand::Leave, "help" => { return TuiCommand::SystemMessage( - "Commands: /nick , /connect , /voice, /bitrate , /mic, /camera, /screen, /file , /leave, /quit".to_string(), + "Commands: /nick , /connect , /voice, /mic, /camera, /screen, /file , /accept , /leave, /quit".to_string(), ); } "bitrate" => { @@ -216,13 +221,16 @@ impl App { if let Ok(kbps) = kbps_str.parse::() { return TuiCommand::SetBitrate(kbps * 1000); } else { - return TuiCommand::SystemMessage("Usage: /bitrate (e.g. 128)".to_string()); + return TuiCommand::SystemMessage( + "Usage: /bitrate (e.g. 128)".to_string(), + ); } } _ => { - return TuiCommand::SystemMessage( - format!("Unknown command: /{}. Type /help", parts[0]), - ); + return TuiCommand::SystemMessage(format!( + "Unknown command: /{}. Type /help", + parts[0] + )); } } } @@ -358,7 +366,7 @@ impl App { let mode = self.input_mode.clone(); self.input_mode = InputMode::Editing; self.audio_devices.clear(); - + if mode == InputMode::MicSelect { return TuiCommand::SelectMic(node_name); } else { @@ -471,7 +479,7 @@ fn render_device_overlay(frame: &mut Frame, area: Rect, app: &App) { let block = Block::default() .title(title) .borders(Borders::ALL) - .border_style(Style::default().fg(app.theme.border)); + .border_style(Style::default().fg(app.theme.input_border)); let inner = block.inner(popup_area); frame.render_widget(block, popup_area); @@ -481,7 +489,11 @@ fn render_device_overlay(frame: &mut Frame, area: Rect, app: &App) { .iter() .enumerate() .map(|(i, dev)| { - let marker = if i == app.device_selected_index { "▶ " } else { " " }; + let marker = if i == app.device_selected_index { + "▶ " + } else { + " " + }; let style = if i == app.device_selected_index { Style::default() .fg(app.theme.self_name) @@ -499,3 +511,32 @@ fn render_device_overlay(frame: &mut Frame, area: Rect, app: &App) { let list = List::new(items); frame.render_widget(list, inner); } + +/// Deterministic random color for a peer info string. +pub fn get_peer_color(name: &str) -> ratatui::style::Color { + use ratatui::style::Color; + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + name.hash(&mut hasher); + let hash = hasher.finish(); + + // Palette of distinguishable colors (excluding dark/black) + let colors = [ + Color::Red, + Color::Green, + Color::Yellow, + Color::Blue, + Color::Magenta, + Color::Cyan, + Color::LightRed, + Color::LightGreen, + Color::LightYellow, + Color::LightBlue, + Color::LightMagenta, + Color::LightCyan, + ]; + + colors[(hash as usize) % colors.len()] +} diff --git a/src/tui/peer_panel.rs b/src/tui/peer_panel.rs index f895d41..eb9eb9c 100644 --- a/src/tui/peer_panel.rs +++ b/src/tui/peer_panel.rs @@ -14,7 +14,7 @@ pub fn render(frame: &mut Frame, area: Rect, peers: &[PeerInfo], app: &App) { let block = Block::default() .title(format!(" 👥 Peers ({}) ", peers.len())) .borders(Borders::ALL) - .border_style(Style::default().fg(app.theme.border)); + .border_style(Style::default().fg(app.theme.peer_border)); let inner = block.inner(area); frame.render_widget(block, area); @@ -29,10 +29,7 @@ pub fn render(frame: &mut Frame, area: Rect, peers: &[PeerInfo], app: &App) { let items: Vec = peers .iter() .map(|peer| { - let name = peer - .name - .as_deref() - .unwrap_or("unknown"); + let name = peer.name.as_deref().unwrap_or("unknown"); let id_short = format!("{}", peer.id).chars().take(8).collect::(); let caps = peer @@ -56,7 +53,7 @@ pub fn render(frame: &mut Frame, area: Rect, peers: &[PeerInfo], app: &App) { let (marker, marker_color, name_suffix) = if peer.is_self { ("★ ", app.theme.self_name, " (you)") } else { - ("● ", app.theme.peer_name, "") + ("● ", crate::tui::get_peer_color(name), "") }; let line = Line::from(vec![ diff --git a/src/tui/status_bar.rs b/src/tui/status_bar.rs index 568c785..401faf5 100644 --- a/src/tui/status_bar.rs +++ b/src/tui/status_bar.rs @@ -6,8 +6,8 @@ use ratatui::text::{Line, Span}; use ratatui::widgets::Paragraph; use ratatui::Frame; -use crate::media::MediaState; use super::InputMode; +use crate::media::MediaState; pub fn render( frame: &mut Frame, @@ -25,11 +25,25 @@ pub fn render( }; let mode_span = match input_mode { - InputMode::Normal => Span::styled(" NORMAL ", Style::default().fg(Color::Black).bg(Color::Blue)), - InputMode::Editing => Span::styled(" INSERT ", Style::default().fg(Color::Black).bg(Color::Green)), - InputMode::FilePrompt => Span::styled(" FILE ", Style::default().fg(Color::Black).bg(Color::Yellow)), - InputMode::MicSelect => Span::styled(" MIC ", Style::default().fg(Color::Black).bg(Color::Magenta)), - InputMode::SpeakerSelect => Span::styled(" SPKR ", Style::default().fg(Color::Black).bg(Color::Cyan)), + InputMode::Normal => Span::styled( + " NORMAL ", + Style::default().fg(Color::Black).bg(Color::Blue), + ), + InputMode::Editing => Span::styled( + " INSERT ", + Style::default().fg(Color::Black).bg(Color::Green), + ), + InputMode::FilePrompt => Span::styled( + " FILE ", + Style::default().fg(Color::Black).bg(Color::Yellow), + ), + InputMode::MicSelect => Span::styled( + " MIC ", + Style::default().fg(Color::Black).bg(Color::Magenta), + ), + InputMode::SpeakerSelect => { + Span::styled(" SPKR ", Style::default().fg(Color::Black).bg(Color::Cyan)) + } }; let line = Line::from(vec![ @@ -47,8 +61,7 @@ pub fn render( ), ]); - let paragraph = Paragraph::new(line) - .style(Style::default().bg(Color::Rgb(30, 30, 40))); + let paragraph = Paragraph::new(line).style(Style::default().bg(Color::Rgb(30, 30, 40))); frame.render_widget(paragraph, area); } diff --git a/src/web/mod.rs b/src/web/mod.rs index 3573c47..55dbbd7 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -1,10 +1,10 @@ use axum::{ + body::Bytes, extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, State, }, http::{header, StatusCode, Uri}, - body::Bytes, response::IntoResponse, routing::get, Router, @@ -14,8 +14,8 @@ use std::net::SocketAddr; use crate::media::WebMediaEvent; use crate::protocol::MediaKind; -use tokio::sync::broadcast; use futures::{SinkExt, StreamExt}; +use tokio::sync::broadcast; #[derive(Clone)] struct AppState { @@ -35,7 +35,12 @@ pub async fn start_web_server( cam_tx: broadcast::Sender>, screen_tx: broadcast::Sender>, ) { - let state = AppState { tx, mic_tx, cam_tx, screen_tx }; + let state = AppState { + tx, + mic_tx, + cam_tx, + screen_tx, + }; let app = Router::new() .route("/ws", get(ws_handler)) .fallback(static_handler) @@ -61,10 +66,7 @@ pub async fn start_web_server( axum::serve(listener, app).await.unwrap(); } -async fn ws_handler( - ws: WebSocketUpgrade, - State(state): State, -) -> impl IntoResponse { +async fn ws_handler(ws: WebSocketUpgrade, State(state): State) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_socket(socket, state)) } @@ -75,20 +77,28 @@ async fn handle_socket(socket: WebSocket, state: AppState) { tokio::spawn(async move { while let Ok(event) = rx.recv().await { let msg = match event { - WebMediaEvent::Audio { peer_id, data: samples } => { - // 1 byte header (0) + 1 byte ID len + ID bytes + f32 bytes - let id_bytes = peer_id.as_bytes(); - let id_len = id_bytes.len() as u8; - let mut payload = Vec::with_capacity(1 + 1 + id_bytes.len() + samples.len() * 4); - payload.push(0u8); - payload.push(id_len); - payload.extend_from_slice(id_bytes); - for s in samples { - payload.extend_from_slice(&s.to_ne_bytes()); - } - Message::Binary(Bytes::from(payload)) + WebMediaEvent::Audio { + peer_id, + data: samples, + } => { + // 1 byte header (0) + 1 byte ID len + ID bytes + f32 bytes + let id_bytes = peer_id.as_bytes(); + let id_len = id_bytes.len() as u8; + let mut payload = + Vec::with_capacity(1 + 1 + id_bytes.len() + samples.len() * 4); + payload.push(0u8); + payload.push(id_len); + payload.extend_from_slice(id_bytes); + for s in samples { + payload.extend_from_slice(&s.to_ne_bytes()); + } + Message::Binary(Bytes::from(payload)) } - WebMediaEvent::Video { peer_id, kind, data } => { + WebMediaEvent::Video { + peer_id, + kind, + data, + } => { // 1 byte header (1=Camera, 2=Screen) + 1 byte ID len + ID bytes + MJPEG data let header = match kind { MediaKind::Camera => 1u8, @@ -97,7 +107,7 @@ async fn handle_socket(socket: WebSocket, state: AppState) { }; let id_bytes = peer_id.as_bytes(); let id_len = id_bytes.len() as u8; - + let mut payload = Vec::with_capacity(1 + 1 + id_bytes.len() + data.len()); payload.push(header); payload.push(id_len); @@ -116,27 +126,32 @@ async fn handle_socket(socket: WebSocket, state: AppState) { while let Some(msg) = receiver.next().await { match msg { Ok(Message::Binary(data)) => { - if data.is_empty() { continue; } + if data.is_empty() { + continue; + } let header = data[0]; let payload = &data[1..]; match header { - 3 => { // Mic (f32 PCM) + 3 => { + // Mic (f32 PCM) // integrity check if payload.len() % 4 == 0 { - let samples: Vec = payload - .chunks_exact(4) - .map(|b| f32::from_ne_bytes([b[0], b[1], b[2], b[3]])) - .collect(); - // tracing::debug!("Received mic samples: {}", samples.len()); - let _ = state.mic_tx.send(samples); + let samples: Vec = payload + .chunks_exact(4) + .map(|b| f32::from_ne_bytes([b[0], b[1], b[2], b[3]])) + .collect(); + // tracing::debug!("Received mic samples: {}", samples.len()); + let _ = state.mic_tx.send(samples); } } - 4 => { // Camera (MJPEG) + 4 => { + // Camera (MJPEG) tracing::debug!("Received camera frame: {} bytes", payload.len()); let _ = state.cam_tx.send(payload.to_vec()); } - 5 => { // Screen (MJPEG) + 5 => { + // Screen (MJPEG) tracing::debug!("Received screen frame: {} bytes", payload.len()); let _ = state.screen_tx.send(payload.to_vec()); } diff --git a/tests/network_tests.rs b/tests/network_tests.rs index dd6e2bd..42c9b01 100644 --- a/tests/network_tests.rs +++ b/tests/network_tests.rs @@ -6,7 +6,13 @@ use tokio::time::{timeout, Duration}; const TEST_TIMEOUT: Duration = Duration::from_secs(10); -async fn spawn_node(topic: [u8; 32]) -> Result<(NetworkManager, tokio::sync::mpsc::Sender, tokio::sync::mpsc::Receiver)> { +async fn spawn_node( + topic: [u8; 32], +) -> Result<( + NetworkManager, + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, +)> { NetworkManager::new(topic).await } @@ -27,7 +33,7 @@ async fn test_p2p_connection_and_gossip() -> Result<()> { // 1. Join Gossip // Node A starts alone node_a.join_gossip(vec![], tx_a.clone()).await?; - + // Node B joins, bootstrapping from Node A. // We force a connection first so B knows A's address. node_b.endpoint.connect(addr_a, iroh_gossip::ALPN).await?; @@ -38,20 +44,26 @@ async fn test_p2p_connection_and_gossip() -> Result<()> { let _peer_up_a = timeout(TEST_TIMEOUT, async { while let Some(event) = rx_a.recv().await { if let NetEvent::PeerUp(peer_id) = event { - if peer_id == id_b { return Ok(()); } + if peer_id == id_b { + return Ok(()); + } } } anyhow::bail!("Stream ended without PeerUp"); - }).await??; + }) + .await??; let _peer_up_b = timeout(TEST_TIMEOUT, async { while let Some(event) = rx_b.recv().await { if let NetEvent::PeerUp(peer_id) = event { - if peer_id == id_a { return Ok(()); } + if peer_id == id_a { + return Ok(()); + } } } anyhow::bail!("Stream ended without PeerUp"); - }).await??; + }) + .await??; // 3. Test Gossip Broadcast (Chat Message) let chat_msg = GossipMessage::Chat(ChatMessage { @@ -75,7 +87,8 @@ async fn test_p2p_connection_and_gossip() -> Result<()> { } } anyhow::bail!("Stream ended without GossipReceived"); - }).await??; + }) + .await??; Ok(()) } @@ -96,12 +109,12 @@ async fn test_direct_file_transfer_stream() -> Result<()> { node_b.join_gossip(vec![id_a], tx_b).await?; // 1. Open direct stream from A to B - // node_b needs to accept the stream. In the actual app, this is handled by spawn_acceptor loop + // node_b needs to accept the stream. In the actual app, this is handled by spawn_acceptor loop // sending NetEvent::IncomingFileStream. // A opens stream - let (mut send_stream, mut _recv_stream) = node_a.open_file_stream(id_b).await?; - + let (mut send_stream, mut _recv_stream) = node_a.open_file_stream(id_b).await?; + // Send some data let test_data = b"Hello direct stream"; send_stream.write_all(test_data).await?; @@ -115,7 +128,8 @@ async fn test_direct_file_transfer_stream() -> Result<()> { } } anyhow::bail!("Stream ended without IncomingFileStream"); - }).await??; + }) + .await??; assert_eq!(from, id_a); @@ -156,25 +170,35 @@ async fn test_media_stream_separation() -> Result<()> { // Receiver should get IncomingMediaStream with correct kind timeout(TEST_TIMEOUT, async { while let Some(event) = rx_receiver.recv().await { - if let NetEvent::IncomingMediaStream { from: _, kind: k, .. } = event { + if let NetEvent::IncomingMediaStream { + from: _, kind: k, .. + } = event + { assert_eq!(k, kind); return Ok(()); } } anyhow::bail!("Stream ended without IncomingMediaStream for {:?}", kind); - }).await??; + }) + .await??; Ok(()) } // 1. Verify Voice Stream - verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Voice).await.context("Voice stream failed")?; + verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Voice) + .await + .context("Voice stream failed")?; // 2. Verify Camera Stream - verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Camera).await.context("Camera stream failed")?; + verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Camera) + .await + .context("Camera stream failed")?; // 3. Verify Screen Stream - verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Screen).await.context("Screen stream failed")?; + verify_stream(&node_a, id_b, &mut rx_b, MediaKind::Screen) + .await + .context("Screen stream failed")?; Ok(()) } diff --git a/tests/tests.rs b/tests/tests.rs index a68c4d9..9b00da9 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -199,7 +199,10 @@ mod config_tests { let config = AppConfig::default(); let toml_str = toml::to_string_pretty(&config).unwrap(); let parsed: AppConfig = toml::from_str(&toml_str).unwrap(); - assert_eq!(parsed.media.screen_resolution, config.media.screen_resolution); + assert_eq!( + parsed.media.screen_resolution, + config.media.screen_resolution + ); assert_eq!(parsed.media.mic_name, config.media.mic_name); assert_eq!(parsed.network.topic, config.network.topic); }