From 472b615ea4fb01ba392f9b2c3dcc434cf6708391 Mon Sep 17 00:00:00 2001 From: mixa Date: Wed, 11 Feb 2026 21:32:20 +0300 Subject: [PATCH] ui+faster vc --- src/main.rs | 3 + src/media/capture.rs | 290 +++++-------------------------------------- src/media/mod.rs | 17 ++- src/media/voice.rs | 113 ++++++++++------- src/net/mod.rs | 109 +++++++++++----- web/app.js | 170 ++++++++++++------------- web/index.html | 54 +++++--- web/style.css | 282 +++++++++++++++++++++++++++-------------- 8 files changed, 499 insertions(+), 539 deletions(-) diff --git a/src/main.rs b/src/main.rs index 95baf71..6f716a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -558,6 +558,9 @@ async fn handle_net_event( chat.add_system_message(format!("📡 Incoming {:?} stream from {}", kind, short_id)); media.handle_incoming_media(from, kind, send, recv); } + NetEvent::IncomingDatagram { from, data } => { + media.handle_incoming_datagram(from, data); + } } } diff --git a/src/media/capture.rs b/src/media/capture.rs index ca048e3..67052ab 100644 --- a/src/media/capture.rs +++ b/src/media/capture.rs @@ -3,32 +3,21 @@ //! Captures video by spawning an `ffmpeg` process and reading its stdout. //! Plays video by spawning `mpv` (or `vlc`) and writing to its stdin. -use std::io::Read; -use std::process::{Command as StdCommand, Stdio as StdStdio}; +use anyhow::Result; +use tracing; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread; -use anyhow::{Context, Result}; -use tracing; - -use crate::protocol::{write_framed, decode_framed, MediaKind, MediaStreamMessage}; +use crate::protocol::{decode_framed, write_framed, MediaKind, MediaStreamMessage}; use crate::media::WebMediaEvent; /// Manages a video capture session (camera or screen). pub struct VideoCapture { - pub kind: MediaKind, // Make kind public or accessible running: Arc, - capture_thread: Option>, - player_thread: Option>, // Keep it for now if handle_incoming_video uses it tasks: Vec>, } -/// Chunk of video data from FFmpeg. -struct VideoChunk { - timestamp_ms: u64, - data: Vec, -} + impl VideoCapture { /// Start video capture with web input (broadcast receiver). @@ -58,115 +47,25 @@ impl VideoCapture { } Ok(Self { - kind, // Added kind running, - capture_thread: None, - player_thread: None, tasks, // Added tasks }) } - /// Start capture of the given `kind` and send video data to peers. - pub fn start( - kind: MediaKind, - peer_streams: Vec<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)>, - resolution: Option<(u32, u32)>, - ) -> Result { - let running = Arc::new(AtomicBool::new(true)); - // Use a bounded channel to backpressure ffmpeg if network is slow - let (chunk_tx, chunk_rx) = crossbeam_channel::bounded::(64); - - let capture_running = running.clone(); - let capture_thread = thread::Builder::new() - .name(format!("ffmpeg-{:?}", kind)) - .spawn(move || { - if let Err(e) = run_ffmpeg_capture(capture_running, chunk_tx, kind, resolution) { - tracing::error!("FFmpeg capture error: {}", e); - } - }) - .context("Failed to spawn ffmpeg capture thread")?; - - let mut tasks = Vec::new(); - // Parameters for signaling (mostly informative now, as ffmpeg controls generic stream) - let (width, height, fps) = match kind { - MediaKind::Camera => (640, 480, 30), - MediaKind::Screen => { - let (w, h) = resolution.unwrap_or((1920, 1080)); - (w, h, 30) - }, - _ => (0, 0, 0), - }; - - for (mut send, _recv) in peer_streams { - let chunk_rx = chunk_rx.clone(); - let running = running.clone(); - - let task = tokio::spawn(async move { - let start = MediaStreamMessage::VideoStart { - kind, - width, - height, - fps, - }; - if let Err(e) = write_framed(&mut send, &start).await { - tracing::error!("Failed to send VideoStart: {}", e); - return; - } - - let mut sequence: u64 = 0; - loop { - if !running.load(Ordering::Relaxed) { - break; - } - match chunk_rx.recv_timeout(std::time::Duration::from_millis(500)) { - Ok(chunk) => { - let msg = MediaStreamMessage::VideoFrame { - sequence, - timestamp_ms: chunk.timestamp_ms, - data: chunk.data, - }; - if let Err(e) = write_framed(&mut send, &msg).await { - tracing::debug!("Failed to send VideoFrame (peer disconnected?): {}", e); - break; - } - sequence += 1; - } - Err(crossbeam_channel::RecvTimeoutError::Timeout) => continue, - Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break, - } - } - - let stop = MediaStreamMessage::VideoStop { kind }; - let _ = write_framed(&mut send, &stop).await; - }); - - tasks.push(task); - } - - Ok(Self { - kind, - running, - capture_thread: Some(capture_thread), - player_thread: None, // Initialized to None for native capture - tasks, - }) - } - /// Stop capture. pub fn stop(&mut self) { self.running.store(false, Ordering::Relaxed); for task in self.tasks.drain(..) { task.abort(); } - if let Some(handle) = self.capture_thread.take() { - let _ = handle.join(); - } - if let Some(handle) = self.player_thread.take() { - let _ = handle.join(); - } } - /// Handle incoming video stream taking the first start message (Web Version). + + + + + /// Handle incoming video stream from a peer (Web Version). + /// Receives video frames (e.g. H.264/VP9 encoded inside protocol messages) and forwards to frontend. pub async fn handle_incoming_video_web( from: iroh::EndpointId, message: MediaStreamMessage, @@ -174,43 +73,32 @@ impl VideoCapture { broadcast_tx: tokio::sync::broadcast::Sender, ) -> Result<()> { let kind = match message { - MediaStreamMessage::VideoStart { kind, .. } => kind, - _ => anyhow::bail!("Invalid start message for video"), + MediaStreamMessage::VideoStart { kind, .. } => kind, + _ => anyhow::bail!("Expected VideoStart"), }; - tracing::info!("Starting {:?} playback via Web Broadcast", kind); + tracing::info!("Starting {:?} stream handler for {}", kind, from); loop { - match decode_framed::(&mut recv).await { - Ok(msg) => match msg { - MediaStreamMessage::VideoFrame { data, .. } => { - // Send to web - let short_id: String = format!("{}", from).chars().take(8).collect(); - let _ = broadcast_tx.send(WebMediaEvent::Video { peer_id: short_id, kind, data }); - } - MediaStreamMessage::VideoStop { .. } => { - break; - } - _ => {} - }, - Err(_) => break, // Stream closed + let msg: MediaStreamMessage = match decode_framed(&mut recv).await { + Ok(m) => m, + Err(_) => break, // EOF + }; + + match msg { + MediaStreamMessage::VideoFrame { data, .. } => { + // Broadcast to web + let short_id: String = format!("{}", from).chars().take(8).collect(); + let _ = broadcast_tx.send(WebMediaEvent::Video { peer_id: short_id, kind: kind.clone(), data }); + } + MediaStreamMessage::VideoStop { .. } => { + tracing::info!("Peer stopped video"); + break; + } + _ => {} } } Ok(()) } - - /// Handle incoming video stream taking the first start message. - pub async fn handle_incoming_video( - message: MediaStreamMessage, - recv: iroh::endpoint::RecvStream, - ) -> Result<()> { - match message { - MediaStreamMessage::VideoStart { kind, .. } => { - tracing::info!("Starting {:?} playback via MPV/VLC", kind); - run_player_loop(recv).await - } - _ => anyhow::bail!("Invalid start message for video"), - } - } } impl Drop for VideoCapture { @@ -262,128 +150,8 @@ async fn run_video_sender_web( Ok(()) } -fn run_ffmpeg_capture( - running: Arc, - chunk_tx: crossbeam_channel::Sender, - kind: MediaKind, - resolution: Option<(u32, u32)>, -) -> Result<()> { - let mut child = match kind { - MediaKind::Camera => { - // Camera: Use ffmpeg directly with v4l2 - let mut cmd = StdCommand::new("ffmpeg"); - cmd.args(&[ - "-f", "v4l2", "-framerate", "30", "-video_size", "640x480", "-i", "/dev/video0", - "-c:v", "mjpeg", "-preset", "ultrafast", "-tune", "zerolatency", "-f", "mpegts", - "-", - ]); - cmd.stdout(StdStdio::piped()).stderr(StdStdio::null()); - cmd.spawn().context("Failed to spawn ffmpeg for camera")? - } - MediaKind::Screen => { - // Screen: Use pipewiresrc (via gst-launch) with MJPEG encoding - // User requested configurable resolution (default 720p). - let (w, h) = resolution.unwrap_or((1920, 1080)); - - // Pipeline: pipewiresrc -> videoscale -> jpegenc -> fdsink - let pipeline = format!( - "gst-launch-1.0 -q pipewiresrc do-timestamp=true \ - ! videoscale ! video/x-raw,width={},height={} \ - ! jpegenc quality=50 \ - ! fdsink", - w, h - ); - - let mut cmd = StdCommand::new("sh"); - cmd.args(&["-c", &pipeline]); - cmd.stdout(StdStdio::piped()).stderr(StdStdio::null()); - cmd.spawn().context("Failed to spawn pipewire capture pipeline")? - } - _ => anyhow::bail!("Unsupported media kind for capture"), - }; - let mut stdout = child.stdout.take().context("Failed to open capture stdout")?; - let mut buffer = [0u8; 4096]; // 4KB chunks - - while running.load(Ordering::Relaxed) { - match stdout.read(&mut buffer) { - Ok(0) => break, // EOF - Ok(n) => { - let chunk = VideoChunk { - timestamp_ms: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64, - data: buffer[..n].to_vec(), - }; - if chunk_tx.send(chunk).is_err() { - break; // Receiver dropped - } - } - Err(e) => { - tracing::error!("Error reading capture stdout: {}", e); - break; - } - } - } - - let _ = child.kill(); - let _ = child.wait(); - Ok(()) -} // --------------------------------------------------------------------------- // Player Logic (MPV/VLC) // --------------------------------------------------------------------------- - -async fn run_player_loop(mut recv: iroh::endpoint::RecvStream) -> Result<()> { - use tokio::process::Command; - use tokio::io::AsyncWriteExt; - use std::process::Stdio; - - // Try spawning mpv - let mut cmd = Command::new("mpv"); - cmd.args(&["--no-cache", "--untimed", "--no-terminal", "--profile=low-latency", "-"]); - cmd.stdin(Stdio::piped()) - .stdout(Stdio::null()) - .stderr(Stdio::null()); - - // We must kill child on drop. - cmd.kill_on_drop(true); - - let mut child = match cmd.spawn() { - Ok(c) => c, - Err(e) => { - tracing::warn!("Failed to spawn mpv ({}), trying vlc...", e); - let mut vlc = Command::new("vlc"); - vlc.args(&["-", "--network-caching=300"]); - vlc.stdin(Stdio::piped()) - .stdout(Stdio::null()) - .stderr(Stdio::null()); - vlc.kill_on_drop(true); - vlc.spawn().context("Failed to spawn vlc either")? - } - }; - - let mut stdin = child.stdin.take().context("Failed to open player stdin")?; - - loop { - match decode_framed::(&mut recv).await { - Ok(msg) => match msg { - MediaStreamMessage::VideoFrame { data, .. } => { - if let Err(_) = stdin.write_all(&data).await { - break; // Player closed - } - } - MediaStreamMessage::VideoStop { .. } => { - break; - } - _ => {} - }, - Err(_) => break, // Stream closed - } - } - - // Child is killed on drop (recv loop end) - Ok(()) -} diff --git a/src/media/mod.rs b/src/media/mod.rs index 5320e9b..20be841 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -226,7 +226,6 @@ impl MediaState { _send: iroh::endpoint::SendStream, mut recv: iroh::endpoint::RecvStream, ) { - let speaker_name = self.speaker_name.clone(); let broadcast_tx = self.broadcast_tx.clone(); // Spawn a task to determine stream type and handle it let handle = tokio::spawn(async move { @@ -266,6 +265,22 @@ impl MediaState { self.incoming_media.retain(|h| !h.is_finished()); } + /// Handle an incoming datagram (unreliable audio/video). + pub fn handle_incoming_datagram(&mut self, from: EndpointId, data: bytes::Bytes) { + // We assume datagrams are for VOICE for now (simplification). + // Or we should add a prefix byte? + // Since we are optimizing audio, let's assume it's audio. + // But if we add video datagrams later... + + // For now, let's try to interpret as audio. + // Since VoiceChat expects `MediaStreamMessage`, we need to see how `postcard` serialized it. + // If sender sends serialized `AudioData`, we can deserialize it. + + if let Some(voice) = &mut self.voice { + voice.handle_datagram(from, data, self.broadcast_tx.clone()); + } + } + // ----------------------------------------------------------------------- // Status for TUI // ----------------------------------------------------------------------- diff --git a/src/media/voice.rs b/src/media/voice.rs index e2a7856..039bafe 100644 --- a/src/media/voice.rs +++ b/src/media/voice.rs @@ -11,8 +11,9 @@ use std::sync::Arc; use std::thread; use anyhow::Result; -use crate::protocol::{write_framed, decode_framed, MediaStreamMessage}; +use crate::protocol::{decode_framed, MediaStreamMessage}; use crate::media::WebMediaEvent; +use postcard; // Use audiopus types directly use audiopus::{coder::Encoder as OpusEncoder, coder::Decoder as OpusDecoder, Channels, Application, SampleRate, Bitrate}; @@ -96,6 +97,7 @@ pub struct VoiceChat { running: Arc, capture_thread: Option>, tasks: Vec>, + datagram_decoders: std::collections::HashMap, } impl VoiceChat { @@ -139,6 +141,7 @@ impl VoiceChat { running, capture_thread: None, tasks, + datagram_decoders: std::collections::HashMap::new(), }) } @@ -205,6 +208,33 @@ impl VoiceChat { } Ok(()) } + + pub fn handle_datagram( + &mut self, + from: iroh::EndpointId, + data: bytes::Bytes, + broadcast_tx: tokio::sync::broadcast::Sender + ) { + match postcard::from_bytes::(&data) { + Ok(MediaStreamMessage::AudioData { opus_data, .. }) => { + let decoder = self.datagram_decoders.entry(from).or_insert_with(|| { + OpusDecoder::new(SampleRate::Hz48000, Channels::Mono).expect("Failed to create decoder") + }); + // Max frame size is ~120ms (5760 samples). Use safe buffer. + let mut pcm = vec![0.0f32; 5760]; + match decoder.decode_float(Some(&opus_data), &mut pcm, false) { + Ok(len) => { + let samples = pcm[..len].to_vec(); + let short_id: String = format!("{}", from).chars().take(8).collect(); + let _ = broadcast_tx.send(WebMediaEvent::Audio { peer_id: short_id, data: samples }); + } + Err(e) => tracing::warn!("Opus decode error: {:?}", e), + } + }, + Ok(_) => {}, // Ignore non-audio datagrams + Err(e) => tracing::warn!("Failed to deserialize datagram: {}", e), + } + } } // --------------------------------------------------------------------------- @@ -222,29 +252,30 @@ async fn run_opis_sender_web_multi( return Ok(()); } - // Open streams to all peers - let mut streams = Vec::new(); + // Connect to all peers to get Connection handles + let mut connections = Vec::new(); for peer in peers { - match network_manager.open_media_stream(peer, crate::protocol::MediaKind::Voice).await { - Ok((mut send, _)) => { - if let Err(e) = write_framed(&mut send, &MediaStreamMessage::AudioStart { - channels: 1, - sample_rate: 48000, - frame_size_ms: FRAME_SIZE_MS as u8, - }).await { - tracing::error!("Failed to send start to {}: {}", peer, e); - continue; - } - streams.push(send); + // We use VOICE_ALPN, but for datagrams ALPN matters for connection establishment. + // If we already have a connection (e.g. gossip), iroh reuses it? + // Yes, connect() returns the existing connection if active. + match network_manager.endpoint.connect(peer, crate::net::VOICE_ALPN).await { + Ok(conn) => { + connections.push(conn); } Err(e) => { - tracing::warn!("Failed to open voice stream to {}: {}", peer, e); + tracing::warn!("Failed to connect to {}: {}", peer, e); } } } - if streams.is_empty() { - tracing::warn!("No peers connected for voice chat"); + if connections.is_empty() { + tracing::warn!("No reachable peers for voice chat"); + // We continue anyway? No, useless. + // But maybe peers come online later? + // For now, fail if no initial connection. + // Or wait? The loop only runs while running is true. + // Let's just return to avoid spinning. + return Ok(()); } let mut encoder = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip) @@ -285,28 +316,27 @@ async fn run_opis_sender_web_multi( Ok(len) => { let packet = opus_buffer[..len].to_vec(); let msg = MediaStreamMessage::AudioData { - sequence: 0, // Should maintain seq counter? Protocol doesn't have it? - // Need to check Protocol definition. - // Assuming I need to match variants. - opus_data: packet.clone(), + sequence: 0, // TODO: Add sequence counter for packet loss detection + opus_data: packet, }; - - // Send to all streams - // remove closed streams - // This is async inside a loop which makes ownership tricky for Vec - // but we can iterate. - let mut keep_indices = Vec::new(); - for (i, stream) in streams.iter_mut().enumerate() { - if let Err(e) = write_framed(stream, &msg).await { - tracing::error!("Failed to send audio packet: {}", e); - // mark for removal - } else { - keep_indices.push(i); - } - } - // Naive removal (rebuild vec) if needed, or just warn. - // Ideally remove bad streams. + // Serialize for Datagram (no framing length prefix needed for datagram) + match postcard::to_allocvec(&msg) { + Ok(data) => { + let bytes = bytes::Bytes::from(data); + // Send to all + for (_i, conn) in connections.iter_mut().enumerate() { + if let Err(e) = conn.send_datagram(bytes.clone()) { + tracing::debug!("Failed to send datagram: {}", e); + // Ensure connection is still alive? + // send_datagram fails if connection closed. + // We could remove it. + } + } + // Remove bad connections? + } + Err(e) => tracing::error!("Serialization error: {}", e), + } } Err(e) => { tracing::error!("Opus encode error: {:?}", e); @@ -317,15 +347,10 @@ async fn run_opis_sender_web_multi( Err(tokio::sync::broadcast::error::RecvError::Closed) => break, Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { // Skip if lagged + pcm_buffer.clear(); } } } - - // Send Stop - for stream in &mut streams { - let _ = write_framed(stream, &MediaStreamMessage::AudioStop).await; - let _ = stream.finish(); // fire and forget - } - + Ok(()) } diff --git a/src/net/mod.rs b/src/net/mod.rs index 2f266b3..ce0ee23 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -57,6 +57,11 @@ pub enum NetEvent { send: iroh::endpoint::SendStream, recv: iroh::endpoint::RecvStream, }, + /// A datagram was received from a peer. + IncomingDatagram { + from: EndpointId, + data: bytes::Bytes, + }, } /// Information about a connected peer. @@ -289,42 +294,17 @@ impl NetworkManager { tracing::warn!("Gossip failed to handle connection: {}", e); } } else if alpn == FILE_TRANSFER_ALPN || alpn == VOICE_ALPN || alpn == CAMERA_ALPN || alpn == SCREEN_ALPN { - // Handle application protocols - match conn.accept_bi().await { - Ok((send, recv)) => { - let event = if alpn == FILE_TRANSFER_ALPN { - NetEvent::IncomingFileStream { - from: peer_id, - send, - recv, - } - } else { - let kind = if alpn == VOICE_ALPN { - protocol::MediaKind::Voice - } else if alpn == CAMERA_ALPN { - protocol::MediaKind::Camera - } else { - protocol::MediaKind::Screen - }; - NetEvent::IncomingMediaStream { - from: peer_id, - kind, - send, - recv, - } - }; - let _ = event_tx.send(event).await; - } - Err(e) => { - tracing::warn!("Failed to accept bi stream: {}", e); - } + // Handle application protocols with a dedicated loop + tracing::info!("Accepted connection from {} with ALPN {:?}", peer_id, String::from_utf8_lossy(&alpn)); + if let Err(e) = Self::handle_app_connection(conn, alpn, event_tx).await { + tracing::warn!("Connection handler error for {}: {}", peer_id, e); } } else { - tracing::warn!("Ignoring connection with unknown ALPN: {:?}", String::from_utf8_lossy(&alpn)); + tracing::warn!("Ignored connection with unknown ALPN: {:?}", String::from_utf8_lossy(&alpn)); } - } + }, Err(e) => { - tracing::warn!("Failed to establish connection: {}", e); + tracing::warn!("Failed to accept incoming connection: {}", e); } } }); @@ -332,6 +312,71 @@ impl NetworkManager { }); } + async fn handle_app_connection(conn: iroh::endpoint::Connection, alpn: Vec, event_tx: mpsc::Sender) -> Result<()> { + let peer_id = conn.remote_id(); + + loop { + tokio::select! { + // Accept bi-directional streams (reliable) + res = conn.accept_bi() => { + match res { + Ok((send, recv)) => { + let event = if alpn == FILE_TRANSFER_ALPN { + NetEvent::IncomingFileStream { + from: peer_id, + send, + recv, + } + } else { + // Assume media + let kind = if alpn == VOICE_ALPN { + protocol::MediaKind::Voice + } else if alpn == CAMERA_ALPN { + protocol::MediaKind::Camera + } else { + protocol::MediaKind::Screen + }; + NetEvent::IncomingMediaStream { + from: peer_id, + kind, + send, + recv, + } + }; + if event_tx.send(event).await.is_err() { + break; + } + } + Err(e) => { + // Connection closed or error + tracing::debug!("Connection accept_bi closed for {}: {}", peer_id, e); + break; + } + } + } + // Accept datagrams (unreliable) + res = conn.read_datagram() => { + match res { + Ok(data) => { + // Dispatch datagram + if event_tx.send(NetEvent::IncomingDatagram { + from: peer_id, + data, + }).await.is_err() { + break; + } + } + Err(e) => { + tracing::debug!("Connection read_datagram closed for {}: {}", peer_id, e); + break; + } + } + } + } + } + Ok(()) + } + /// Graceful shutdown. pub async fn shutdown(self) -> Result<()> { self.endpoint.close().await; diff --git a/web/app.js b/web/app.js index c67f979..a01d26c 100644 --- a/web/app.js +++ b/web/app.js @@ -1,8 +1,9 @@ const toggleMicBtn = document.getElementById('toggle-mic'); const toggleCamBtn = document.getElementById('toggle-cam'); const toggleScreenBtn = document.getElementById('toggle-screen'); -const statusEl = document.getElementById('status'); -const remoteStreamsContainer = document.getElementById('remote-streams'); +const statusOverlay = document.getElementById('status-overlay'); +const connectionStatus = document.getElementById('connection-status'); +const videoGrid = document.getElementById('video-grid'); const localVideo = document.getElementById('local-video'); // --- Local Media State --- @@ -15,8 +16,6 @@ let audioCtx = null; const SAMPLE_RATE = 48000; // --- Remote Peer State --- -// Map -// Note: We can use a single AudioContext for all peers, but need separate scheduler times. const peers = new Map(); // Initialize shared AudioContext for playback @@ -37,19 +36,23 @@ const ws = new WebSocket(`ws://${location.host}/ws`); ws.binaryType = 'arraybuffer'; ws.onopen = () => { - statusEl.textContent = 'Connected'; - statusEl.style.color = '#4ade80'; + statusOverlay.style.display = 'none'; + connectionStatus.innerHTML = 'wifi'; + connectionStatus.classList.add('connected'); + connectionStatus.title = "Connected"; }; ws.onclose = () => { - statusEl.textContent = 'Disconnected'; - statusEl.style.color = '#f87171'; + statusOverlay.style.display = 'flex'; + statusOverlay.querySelector('h2').textContent = "Disconnected. Reconnecting..."; + connectionStatus.innerHTML = 'wifi_off'; + connectionStatus.classList.remove('connected'); + connectionStatus.title = "Disconnected"; }; ws.onmessage = (event) => { const data = event.data; if (data instanceof ArrayBuffer) { - // [Header(1)][ID_Len(1)][ID...][Data...] const view = new DataView(data); if (view.byteLength < 2) return; @@ -74,37 +77,40 @@ ws.onmessage = (event) => { if (header === 0) { // Audio handleRemoteAudio(peer, payload); - } else if (header === 1) { // Id 1 = Camera - handleRemoteVideo(peer, payload, 'camera'); - } else if (header === 2) { // Id 2 = Screen - handleRemoteVideo(peer, payload, 'screen'); + } else if (header === 1) { // Video (Camera) + handleRemoteVideo(peer, payload); + } else if (header === 2) { // Screen + handleRemoteVideo(peer, payload); // Treat screen same as video for grids } } }; function createPeer(peerId) { - // visual card const card = document.createElement('div'); card.className = 'peer-card'; card.id = `peer-${peerId}`; - const header = document.createElement('div'); - header.className = 'peer-header'; - header.innerHTML = `${peerId} `; - card.appendChild(header); + // Video/Image element + const img = document.createElement('img'); + img.className = 'peer-video'; + img.alt = `Video from ${peerId}`; + card.appendChild(img); - // Container for multiple streams - const mediaContainer = document.createElement('div'); - mediaContainer.className = 'peer-media'; - card.appendChild(mediaContainer); + // Overlay info + const info = document.createElement('div'); + info.className = 'peer-info'; + info.innerHTML = ` +
+ ${peerId.substring(0, 8)}... + `; + card.appendChild(info); - remoteStreamsContainer.appendChild(card); + videoGrid.appendChild(card); return { id: peerId, - mediaContainer: mediaContainer, - camImg: null, - screenImg: null, + imgElement: img, + statusElement: info.querySelector('.peer-status'), nextStartTime: 0, lastActivity: Date.now() }; @@ -121,96 +127,95 @@ function handleRemoteAudio(peer, arrayBuffer) { source.connect(ctx.destination); const now = ctx.currentTime; - - // Reset if behind if (peer.nextStartTime < now) { - peer.nextStartTime = now + 0.02; // Reduced from 0.05 - } - - // Cap if too far ahead (latency reduction) - if (peer.nextStartTime > now + 0.5) { - console.warn("High latency detected, resetting playhead", peer.nextStartTime - now); peer.nextStartTime = now + 0.02; - // Note: This overlaps/mixes with existing queued buffers, but helps catch up. - // Ideally we should stop previous sources, but we don't track them yet. + } + // Latency catch-up + if (peer.nextStartTime > now + 0.5) { + peer.nextStartTime = now + 0.02; } source.start(peer.nextStartTime); peer.nextStartTime += buffer.duration; - // Visual indicator? + // Visual indicator peer.lastActivity = Date.now(); - updatePeerStatus(peer, '🎤'); + updatePeerActivity(peer, true); } -function handleRemoteVideo(peer, arrayBuffer, kind) { +function handleRemoteVideo(peer, arrayBuffer) { const blob = new Blob([arrayBuffer], { type: 'image/jpeg' }); const url = URL.createObjectURL(blob); - let img = kind === 'camera' ? peer.camImg : peer.screenImg; - - if (!img) { - img = document.createElement('img'); - img.className = kind; // 'camera' or 'screen' - img.alt = `${kind} from ${peer.id}`; - peer.mediaContainer.appendChild(img); - if (kind === 'camera') peer.camImg = img; - else peer.screenImg = img; - } - - const prevUrl = img.src; - img.onload = () => { + const prevUrl = peer.imgElement.src; + peer.imgElement.onload = () => { if (prevUrl && prevUrl.startsWith('blob:')) { URL.revokeObjectURL(prevUrl); } }; - img.src = url; + peer.imgElement.src = url; peer.lastActivity = Date.now(); - updatePeerStatus(peer, kind === 'camera' ? '📷' : '🖥'); + updatePeerActivity(peer, false); } -function updatePeerStatus(peer, icon) { - // optionally update status indicators in header +let activityTimeout; +function updatePeerActivity(peer, isAudio) { + if (isAudio) { + peer.statusElement.classList.add('speaking'); + // Debounce removal + if (peer.activityTimeout) clearTimeout(peer.activityTimeout); + peer.activityTimeout = setTimeout(() => { + peer.statusElement.classList.remove('speaking'); + }, 200); + } } // --- Local Capture Controls --- +function updateButton(btn, active, iconOn, iconOff) { + const iconSpan = btn.querySelector('.material-icons'); + if (active) { + btn.classList.add('active'); + // btn.classList.remove('danger'); // Optional: use danger for stop? + iconSpan.textContent = iconOn; + } else { + btn.classList.remove('active'); + iconSpan.textContent = iconOff; + } +} + toggleMicBtn.addEventListener('click', async () => { if (micStream) { stopMic(); - toggleMicBtn.classList.remove('active'); - toggleMicBtn.textContent = 'Start Microphone'; + updateButton(toggleMicBtn, false, 'mic', 'mic_off'); } else { await startMic(); - toggleMicBtn.classList.add('active'); - toggleMicBtn.textContent = 'Stop Microphone'; + updateButton(toggleMicBtn, true, 'mic', 'mic_off'); } }); toggleCamBtn.addEventListener('click', async () => { if (camStream) { stopCam(); - toggleCamBtn.classList.remove('active'); - toggleCamBtn.textContent = 'Start Camera'; + updateButton(toggleCamBtn, false, 'videocam', 'videocam_off'); localVideo.srcObject = null; } else { await startCam(); - toggleCamBtn.classList.add('active'); - toggleCamBtn.textContent = 'Stop Camera'; + updateButton(toggleCamBtn, true, 'videocam', 'videocam_off'); } }); toggleScreenBtn.addEventListener('click', async () => { if (screenStream) { stopScreen(); - toggleScreenBtn.classList.remove('active'); - toggleScreenBtn.textContent = 'Start Screen Share'; - localVideo.srcObject = null; + updateButton(toggleScreenBtn, false, 'screen_share', 'screen_share'); + // Restore cam if active? + if (camStream) localVideo.srcObject = camStream; + else localVideo.srcObject = null; } else { await startScreen(); - toggleScreenBtn.classList.add('active'); - toggleScreenBtn.textContent = 'Stop Screen Share'; + updateButton(toggleScreenBtn, true, 'stop_screen_share', 'screen_share'); } }); @@ -219,26 +224,15 @@ async function startMic() { try { micStream = await navigator.mediaDevices.getUserMedia({ audio: true }); micSource = ctx.createMediaStreamSource(micStream); - // Use smaller buffer for lower latency (4096 -> 2048 or 1024) micScriptProcessor = ctx.createScriptProcessor(2048, 1, 1); micScriptProcessor.onaudioprocess = (e) => { if (!micStream) return; if (ws.readyState === WebSocket.OPEN) { const inputData = e.inputBuffer.getChannelData(0); - // Send: Header 3 (Mic) + Floats - // Optimize: direct Float32Array view - const payloadLen = inputData.length * 4; - const buffer = new ArrayBuffer(1 + payloadLen); + const buffer = new ArrayBuffer(1 + inputData.length * 4); const view = new DataView(buffer); - view.setUint8(0, 3); - - // Fast copy using typed array constructor if possible, but we need byte offset 1 - // Just use loop for now but with reduced overhead? - // actually, we can create a Float32Array on the buffer at offset 4 (idx 1)? - // No, offset must be multiple of element size (4). 1 is not. - // So we have to copy. - // But we can use setFloat32 in loop (as before). + view.setUint8(0, 3); // Header 3 = Mic for (let i = 0; i < inputData.length; i++) { view.setFloat32(1 + i * 4, inputData[i], true); } @@ -252,9 +246,9 @@ async function startMic() { mute.gain.value = 0; micScriptProcessor.connect(mute); mute.connect(ctx.destination); - } catch (err) { console.error('Error starting mic:', err); + alert('Mic access failed'); } } @@ -278,10 +272,10 @@ async function startCam() { try { camStream = await navigator.mediaDevices.getUserMedia({ video: { width: 640, height: 480 } }); localVideo.srcObject = camStream; - startVideoSender(camStream, 4); + startVideoSender(camStream, 4); // 4 = Camera } catch (err) { console.error('Error starting camera:', err); - alert('Failed to access camera'); + alert('Camera access failed'); } } @@ -295,14 +289,12 @@ function stopCam() { async function startScreen() { try { screenStream = await navigator.mediaDevices.getDisplayMedia({ video: true }); - // Prioritize screen for local preview if both active? localVideo.srcObject = screenStream; - startVideoSender(screenStream, 5); + startVideoSender(screenStream, 5); // 5 = Screen screenStream.getVideoTracks()[0].onended = () => { stopScreen(); - toggleScreenBtn.classList.remove('active'); - toggleScreenBtn.textContent = 'Start Screen Share'; + updateButton(toggleScreenBtn, false, 'screen_share', 'screen_share'); }; } catch (err) { console.error('Error starting screen:', err); diff --git a/web/index.html b/web/index.html index 38079c7..5227e14 100644 --- a/web/index.html +++ b/web/index.html @@ -5,30 +5,44 @@ P2P Chat Media + + -
-
-

P2P Chat Media

-
Connecting...
-
- -
-
- -
- -
-

Local Preview

- -
- -
- - - +
+ +
+ +
+
+

Connecting...

+
+
+ + +
+ +
You
+
+ + +
+ + + +
+
+ wifi_off +
+
diff --git a/web/style.css b/web/style.css index a76373f..d91301a 100644 --- a/web/style.css +++ b/web/style.css @@ -1,121 +1,219 @@ -/* Base Styles */ +:root { + --bg-color: #121212; + --card-bg: #1e1e1e; + --primary-color: #3b82f6; + --danger-color: #ef4444; + --text-color: #e0e0e0; + --control-bg: rgba(30, 30, 30, 0.9); +} + body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; - background-color: #1e1e1e; - color: #e0e0e0; + background-color: var(--bg-color); + color: var(--text-color); margin: 0; - padding: 20px; - display: flex; - justify-content: center; + padding: 0; + overflow: hidden; /* Prevent scrollbar */ + height: 100vh; + width: 100vw; } -.container { +#app-container { + position: relative; width: 100%; - max-width: 1200px; -} - -header { - display: flex; - justify-content: space-between; - align-items: center; - margin-bottom: 20px; - border-bottom: 1px solid #333; - padding-bottom: 10px; -} - -.status { - font-weight: bold; - color: #fca5a5; /* Red-ish for disconnected */ -} - -/* Remote Streams Grid */ -.remote-streams { - display: grid; - grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); - gap: 20px; - margin-bottom: 30px; -} - -.peer-card { - background-color: #2d2d2d; - border-radius: 8px; - overflow: hidden; - box-shadow: 0 4px 6px rgba(0,0,0,0.3); + height: 100%; display: flex; flex-direction: column; } -.peer-header { - background-color: #333; - padding: 8px 12px; - font-size: 0.9em; - font-weight: bold; - color: #aaa; - display: flex; - justify-content: space-between; +/* Main Video Grid */ +#video-grid { + flex: 1; + display: grid; + grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); + gap: 16px; + padding: 16px; + align-content: center; + justify-content: center; } -.peer-media { - position: relative; +/* Status Overlay */ +.status-overlay { + position: absolute; + top: 0; + left: 0; width: 100%; - /* Aspect ratio placeholder or min-height */ - min-height: 200px; - background-color: #000; + height: 100%; + background: rgba(0, 0, 0, 0.7); + display: flex; + justify-content: center; + align-items: center; + z-index: 100; +} + +.status-content { + text-align: center; +} + +.spinner { + border: 4px solid rgba(255, 255, 255, 0.1); + border-left-color: var(--primary-color); + border-radius: 50%; + width: 40px; + height: 40px; + animation: spin 1s linear infinite; + margin: 20px auto; +} + +@keyframes spin { + 0% { transform: rotate(0deg); } + 100% { transform: rotate(360deg); } +} + +/* Peer Card */ +.peer-card { + background-color: var(--card-bg); + border-radius: 12px; + overflow: hidden; + position: relative; + aspect-ratio: 16 / 9; + box-shadow: 0 4px 10px rgba(0,0,0,0.5); display: flex; justify-content: center; align-items: center; } -.peer-media img { +.peer-card img, .peer-card video { width: 100%; - height: auto; - display: block; - max-height: 400px; - object-fit: contain; + height: 100%; + object-fit: contain; /* or cover if preferred */ + background: black; } -.peer-media .placeholder { - color: #555; +.peer-info { + position: absolute; + bottom: 10px; + left: 10px; + background: rgba(0, 0, 0, 0.6); + padding: 4px 8px; + border-radius: 4px; font-size: 0.8em; -} - -/* Local Preview */ -.local-preview { - margin-top: 20px; - border-top: 1px solid #333; - padding-top: 20px; - opacity: 0.6; -} - -.local-preview video { - width: 200px; - border-radius: 4px; - background: #000; -} - -/* Controls */ -.audio-controls { - display: flex; - gap: 15px; - margin-top: 20px; - justify-content: center; -} - -button { - padding: 10px 20px; - border: none; - border-radius: 4px; - background-color: #3b82f6; - color: white; font-weight: bold; + display: flex; + align-items: center; + gap: 6px; +} + +.peer-status { + width: 8px; + height: 8px; + border-radius: 50%; + background-color: #aaa; +} + +.peer-status.speaking { + background-color: #4ade80; + box-shadow: 0 0 8px #4ade80; +} + +/* Local Preview (PIP) */ +#local-preview { + position: absolute; + bottom: 80px; + right: 20px; + width: 240px; + aspect-ratio: 16 / 9; + background: black; + border-radius: 12px; + box-shadow: 0 4px 12px rgba(0,0,0,0.6); + overflow: hidden; + border: 2px solid #333; + z-index: 50; + transition: width 0.3s; +} + +#local-preview video { + width: 100%; + height: 100%; + object-fit: cover; + transform: scaleX(-1); /* Mirror local video */ +} + +.local-label { + position: absolute; + bottom: 6px; + left: 8px; + font-size: 0.7em; + background: rgba(0,0,0,0.5); + padding: 2px 6px; + border-radius: 4px; +} + +/* Control Bar */ +#control-bar { + position: absolute; + bottom: 20px; + left: 50%; + transform: translateX(-50%); + background: var(--control-bg); + padding: 10px 20px; + border-radius: 50px; + display: flex; + gap: 16px; + align-items: center; + box-shadow: 0 4px 12px rgba(0,0,0,0.4); + z-index: 60; + backdrop-filter: blur(10px); +} + +.control-btn { + width: 48px; + height: 48px; + border-radius: 50%; + border: none; + background-color: #333; + color: white; cursor: pointer; - transition: background-color 0.2s; + display: flex; + justify-content: center; + align-items: center; + transition: all 0.2s; } -button:hover { - background-color: #2563eb; +.control-btn:hover { + background-color: #444; + transform: scale(1.05); } -button.active { - background-color: #ef4444; /* Red for Stop */ +.control-btn.active { + background-color: var(--primary-color); +} + +.control-btn.danger { + background-color: var(--danger-color); +} + +.material-icons { + font-size: 24px; +} + +#toggle-mic.active .material-icons { + /* mic icon when active */ +} + +/* Simple logic for toggling icons handled in JS or via classes */ + +.separator { + width: 1px; + height: 24px; + background: #555; + margin: 0 4px; +} + +.status-indicator { + color: #f87171; /* Disconnected red */ +} + +.status-indicator.connected { + color: #4ade80; /* Connected green */ }