From 78e1efeaead1050ac57093123a38c2fb7592aeaf Mon Sep 17 00:00:00 2001 From: mixa Date: Thu, 19 Feb 2026 07:49:31 +0300 Subject: [PATCH] idfk --- AGENTS.md | 83 ++ Cargo.lock | 330 +++++++- Cargo.toml | 6 +- src/app_logic.rs | 79 +- src/config.rs | 3 + src/file_transfer/mod.rs | 4 - src/gui.rs | 991 ++++++++++++++++++----- src/main.rs | 189 +++-- src/media/capture/gstreamer.rs | 135 +++ src/media/{capture.rs => capture/mod.rs} | 835 ++++++++++--------- src/media/capture/portal.rs | 43 + src/media/mod.rs | 141 +++- src/media/playback.rs | 98 +++ src/media/voice.rs | 206 +++-- src/net/mod.rs | 3 + src/web/mod.rs | 40 +- tests/tests.rs | 49 +- 17 files changed, 2377 insertions(+), 858 deletions(-) create mode 100644 AGENTS.md create mode 100644 src/media/capture/gstreamer.rs rename src/media/{capture.rs => capture/mod.rs} (56%) create mode 100644 src/media/capture/portal.rs create mode 100644 src/media/playback.rs diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..11c8391 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,83 @@ +# P2P Chat Codebase Guide for Agents + +This document outlines the development workflow, code style, and architectural patterns for the `p2p-chat` repository. + +## 1. Build, Test, and Run Commands + +### Basic Commands +* **Build**: `cargo build` +* **Run TUI (default)**: `cargo run` +* **Run GUI**: `cargo run -- --gui` +* **Check**: `cargo check` +* **Format**: `cargo fmt` +* **Lint**: `cargo clippy` + +### Testing +* **Run all tests**: `cargo test` +* **Run a specific test**: `cargo test test_name` +* **Run tests with output**: `cargo test -- --nocapture` + +### Debugging +* **Logs**: + * TUI mode logs to `p2p-chat.log` in the working directory. + * GUI mode logs to `stdout` (INFO/DEBUG) and `stderr` (WARN/ERROR). +* **Environment Variables**: + * `RUST_LOG`: Control logging levels (e.g., `RUST_LOG=p2p_chat=debug,iroh=info`). + +## 2. Code Style & Architecture + +### General Rust Guidelines +* **Edition**: Rust 2021. +* **Formatting**: Strictly follow `rustfmt`. +* **Error Handling**: Use `anyhow::Result` for application-level errors and main functions. Use specific `thiserror` enums for libraries/modules when precise error handling is required. +* **Async/Await**: The project is built on the `tokio` runtime. Use `async/await` for I/O-bound tasks. + +### Project Structure +* `src/main.rs`: Application entry point, runtime setup, and main event loop. +* `src/app_logic.rs`: Core business logic (`AppLogic` struct). Handles network events and application commands. It is agnostic of the UI (TUI vs. GUI). +* `src/gui.rs`: Iced-based GUI implementation following the Elm architecture (Model-View-Update). +* `src/tui/`: Ratatui-based TUI implementation. +* `src/net/`: Networking layer wrapping `iroh` and `iroh-gossip`. +* `src/media/`: Audio/Video capture and playback (GStreamer, cpal, FFmpeg). +* `src/protocol/`: Data structures and serialization (`serde`, `bincode`, `postcard`) for network messages. + +### Naming Conventions +* **Variables/Functions**: `snake_case`. +* **Types/Traits**: `CamelCase`. +* **Constants**: `SCREAMING_SNAKE_CASE`. +* **Files**: `snake_case.rs`. + +### Architectural Patterns +1. **State Management**: + * `AppLogic` holds the source of truth for application state (`ChatState`, `MediaState`, `NetworkManager`, etc.). + * `FrontendState` is a simplified struct derived from `AppLogic` to pass data to the UI (GUI/TUI) for rendering. + * Do not put core business logic inside `gui.rs` or `tui/`. + +2. **Concurrency**: + * Use `tokio::spawn` for background tasks. + * Use `tokio::sync::mpsc` channels for communicating between the UI and the backend logic. + * Use `tokio::sync::broadcast` for one-to-many event distribution (e.g., video frames). + * Use `Arc>` or `Arc` for shared state when channels are insufficient, but prefer message passing. + +3. **GUI (Iced)**: + * **Messages**: Define all UI interactions in the `Message` enum in `src/gui.rs`. + * **Update**: Handle `Message`s in the `update` function. + * **View**: Keep the `view` function strictly for rendering based on `self.state`. + * **Subscriptions**: Use `subscription` to listen for external events (e.g., backend state updates, video frames). + +4. **Networking**: + * Based on `iroh` QUIC. + * Events are received in the main loop and processed by `AppLogic::handle_net_event`. + +### Dependencies +* **Networking**: `iroh`, `iroh-gossip`. +* **Runtime**: `tokio`. +* **UI**: `iced` (GUI), `ratatui` + `crossterm` (TUI). +* **Media**: `gstreamer` (video capture), `cpal` (audio I/O), `ffmpeg-next` (video encoding). + +### Common Tasks +* **Adding a new feature**: + 1. Update `protocol/mod.rs` if it involves new network messages. + 2. Update `AppLogic` to handle the logic. + 3. Update `FrontendState` to expose data to the UI. + 4. Update `gui.rs` and `tui/` to render the new state. diff --git a/Cargo.lock b/Cargo.lock index 6dedcf7..9016787 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,6 +335,24 @@ dependencies = [ "zbus 4.4.0", ] +[[package]] +name = "ashpd" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d43c03d9e36dd40cab48435be0b09646da362c278223ca535493877b2c1dee9" +dependencies = [ + "async-fs", + "async-net", + "enumflags2", + "futures-channel", + "futures-util", + "rand 0.8.5", + "serde", + "serde_repr", + "url", + "zbus 4.4.0", +] + [[package]] name = "async-broadcast" version = "0.7.2" @@ -536,6 +554,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomic_refcell" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" + [[package]] name = "attohttpc" version = "0.30.1" @@ -617,7 +641,7 @@ dependencies = [ "log", "num-rational", "num-traits", - "pastey", + "pastey 0.1.1", "rayon", "thiserror 2.0.18", "v_frame", @@ -2986,6 +3010,16 @@ dependencies = [ "polyval", ] +[[package]] +name = "gif" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae047235e33e2829703574b54fdec96bfbad892062d97fed2f76022287de61b" +dependencies = [ + "color_quant", + "weezl", +] + [[package]] name = "gif" version = "0.14.1" @@ -2996,6 +3030,19 @@ dependencies = [ "weezl", ] +[[package]] +name = "gio-sys" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0071fe88dba8e40086c8ff9bbb62622999f49628344b1d1bf490a48a29d80f22" +dependencies = [ + "glib-sys", + "gobject-sys", + "libc", + "system-deps", + "windows-sys 0.61.2", +] + [[package]] name = "gl" version = "0.14.0" @@ -3022,6 +3069,50 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "151665d9be52f9bb40fc7966565d39666f2d1e69233571b71b87791c7e0528b3" +[[package]] +name = "glib" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16de123c2e6c90ce3b573b7330de19be649080ec612033d397d72da265f1bd8b" +dependencies = [ + "bitflags 2.10.0", + "futures-channel", + "futures-core", + "futures-executor", + "futures-task", + "futures-util", + "gio-sys", + "glib-macros", + "glib-sys", + "gobject-sys", + "libc", + "memchr", + "smallvec", +] + +[[package]] +name = "glib-macros" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf59b675301228a696fe01c3073974643365080a76cc3ed5bc2cbc466ad87f17" +dependencies = [ + "heck 0.5.0", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.114", +] + +[[package]] +name = "glib-sys" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d95e1a3a19ae464a7286e14af9a90683c64d70c02532d88d87ce95056af3e6c" +dependencies = [ + "libc", + "system-deps", +] + [[package]] name = "glob" version = "0.3.3" @@ -3061,6 +3152,17 @@ dependencies = [ "gl_generator", ] +[[package]] +name = "gobject-sys" +version = "0.21.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dca35da0d19a18f4575f3cb99fe1c9e029a2941af5662f326f738a21edaf294" +dependencies = [ + "glib-sys", + "libc", + "system-deps", +] + [[package]] name = "gpu-alloc" version = "0.6.0" @@ -3113,6 +3215,129 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "gstreamer" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bed73742c5d54cb48533be608b67d89f96e1ebbba280be7823f1ef995e3a9d7" +dependencies = [ + "cfg-if", + "futures-channel", + "futures-core", + "futures-util", + "glib", + "gstreamer-sys", + "itertools 0.14.0", + "kstring", + "libc", + "muldiv", + "num-integer", + "num-rational", + "option-operations", + "pastey 0.2.1", + "pin-project-lite", + "smallvec", + "thiserror 2.0.18", +] + +[[package]] +name = "gstreamer-app" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "895753fb0f976693f321e6b9d68f746ef9095f1a5b8277c11d85d807a949fbfc" +dependencies = [ + "futures-core", + "futures-sink", + "glib", + "gstreamer", + "gstreamer-app-sys", + "gstreamer-base", + "libc", +] + +[[package]] +name = "gstreamer-app-sys" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7719cee28afda1a48ab1ee93769628bd0653d3c5be1923bce9a8a4550fcc980" +dependencies = [ + "glib-sys", + "gstreamer-base-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + +[[package]] +name = "gstreamer-base" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dd15c7e37d306573766834a5cbdd8ee711265f217b060f40a9a8eda45298488" +dependencies = [ + "atomic_refcell", + "cfg-if", + "glib", + "gstreamer", + "gstreamer-base-sys", + "libc", +] + +[[package]] +name = "gstreamer-base-sys" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a2eda2c61e13c11883bf19b290d07ea6b53d04fd8bfeb7af64b6006c6c9ee6" +dependencies = [ + "glib-sys", + "gobject-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + +[[package]] +name = "gstreamer-sys" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d88630697e757c319e7bcec7b13919ba80492532dd3238481c1c4eee05d4904" +dependencies = [ + "cfg-if", + "glib-sys", + "gobject-sys", + "libc", + "system-deps", +] + +[[package]] +name = "gstreamer-video" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33987f6a6a99750a07b0341d6288bac89b9b301be4672a209935203d4608d547" +dependencies = [ + "cfg-if", + "futures-channel", + "glib", + "gstreamer", + "gstreamer-base", + "gstreamer-video-sys", + "libc", + "thiserror 2.0.18", +] + +[[package]] +name = "gstreamer-video-sys" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00c28faad96cd40a7b7592433051199691b131b08f622ed5d51c54e049792d3" +dependencies = [ + "glib-sys", + "gobject-sys", + "gstreamer-base-sys", + "gstreamer-sys", + "libc", + "system-deps", +] + [[package]] name = "guillotiere" version = "0.6.2" @@ -3585,6 +3810,7 @@ dependencies = [ "iced_renderer", "iced_widget", "iced_winit", + "image 0.24.9", "thiserror 1.0.69", ] @@ -3618,6 +3844,7 @@ dependencies = [ "iced_core", "log", "rustc-hash 2.1.1", + "tokio", "wasm-bindgen-futures", "wasm-timer", ] @@ -3647,6 +3874,8 @@ dependencies = [ "half", "iced_core", "iced_futures", + "image 0.24.9", + "kamadak-exif", "log", "once_cell", "raw-window-handle", @@ -3887,6 +4116,24 @@ dependencies = [ "xmltree", ] +[[package]] +name = "image" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5690139d2f55868e080017335e4b94cb7414274c74f1669c84fb5feba2c9f69d" +dependencies = [ + "bytemuck", + "byteorder", + "color_quant", + "exr", + "gif 0.13.3", + "jpeg-decoder", + "num-traits", + "png 0.17.16", + "qoi", + "tiff 0.9.1", +] + [[package]] name = "image" version = "0.25.9" @@ -3897,7 +4144,7 @@ dependencies = [ "byteorder-lite", "color_quant", "exr", - "gif", + "gif 0.14.1", "image-webp", "moxcms", "num-traits", @@ -3906,7 +4153,7 @@ dependencies = [ "ravif", "rayon", "rgb", - "tiff", + "tiff 0.10.3", "zune-core 0.5.1", "zune-jpeg 0.5.12", ] @@ -4379,6 +4626,15 @@ dependencies = [ "libc", ] +[[package]] +name = "jpeg-decoder" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00810f1d8b74be64b13dbf3db89ac67740615d6c891f0e7b6179326533011a07" +dependencies = [ + "rayon", +] + [[package]] name = "js-sys" version = "0.3.85" @@ -4389,6 +4645,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kamadak-exif" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef4fc70d0ab7e5b6bafa30216a6b48705ea964cdfc29c050f2412295eba58077" +dependencies = [ + "mutate_once", +] + [[package]] name = "kasuari" version = "0.4.11" @@ -4417,6 +4682,15 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" +[[package]] +name = "kstring" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "558bf9508a558512042d3095138b1f7b8fe90c5467d94f9f1da28b3731c5dbd1" +dependencies = [ + "static_assertions", +] + [[package]] name = "kurbo" version = "0.10.4" @@ -4535,7 +4809,7 @@ dependencies = [ "drm", "gbm", "gl", - "image", + "image 0.25.9", "khronos-egl", "memmap2", "rustix 1.1.3", @@ -4834,6 +5108,18 @@ dependencies = [ "pxfm", ] +[[package]] +name = "muldiv" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" + +[[package]] +name = "mutate_once" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d2233c9842d08cfe13f9eac96e207ca6a2ea10b80259ebe8ad0268be27d2af" + [[package]] name = "n0-error" version = "0.1.3" @@ -5870,6 +6156,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "option-operations" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aca39cf52b03268400c16eeb9b56382ea3c3353409309b63f5c8f0b1faf42754" +dependencies = [ + "pastey 0.2.1", +] + [[package]] name = "orbclient" version = "0.3.50" @@ -5938,6 +6233,7 @@ name = "p2p-chat" version = "0.1.0" dependencies = [ "anyhow", + "ashpd 0.9.2", "audiopus 0.2.0", "axum", "bincode", @@ -5950,10 +6246,13 @@ dependencies = [ "dashmap", "directories", "futures", + "gstreamer", + "gstreamer-app", + "gstreamer-video", "hex", "iced", "iced_futures", - "image", + "image 0.25.9", "iroh", "iroh-gossip", "mime_guess", @@ -6079,6 +6378,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35fb2e5f958ec131621fdd531e9fc186ed768cbe395337403ae56c17a74c68ec" +[[package]] +name = "pastey" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec" + [[package]] name = "patricia_tree" version = "0.8.0" @@ -7126,7 +7431,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25a73a7337fc24366edfca76ec521f51877b114e42dab584008209cca6719251" dependencies = [ - "ashpd", + "ashpd 0.8.1", "block", "dispatch", "js-sys", @@ -8594,6 +8899,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "tiff" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e" +dependencies = [ + "flate2", + "jpeg-decoder", + "weezl", +] + [[package]] name = "tiff" version = "0.10.3" @@ -10746,7 +11062,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b65b5b36b7abe56a64f6dd46e55e26ca1ed5937a84f57fc86e8f2dd37b136f1" dependencies = [ "dispatch2", - "image", + "image 0.25.9", "lazy_static", "libwayshot-xcap", "log", diff --git a/Cargo.toml b/Cargo.toml index 623b4fa..8eaa49c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ directories = "5.0" songbird = { version = "0.4", features = ["builtin-queue"] } audiopus = "0.2" rfd = "0.14" -iced = "0.13" +iced = { version = "0.13", features = ["image", "wgpu", "tokio"] } iced_futures = "0.13" crossbeam-channel = "0.5" @@ -55,10 +55,14 @@ mime_guess = "2.0.5" hex = "0.4.3" cpal = { version = "0.17.1", features = ["jack"] } xcap = "0.8.2" +ashpd = "0.9" image = "0.25.9" ringbuf = "0.4.8" nnnoiseless = "0.5" dashmap = "5" +gstreamer = "0.24.4" +gstreamer-app = "0.24.4" +gstreamer-video = "0.24.4" [profile.dev] opt-level = 0 diff --git a/src/app_logic.rs b/src/app_logic.rs index cf5a4f2..d146d84 100644 --- a/src/app_logic.rs +++ b/src/app_logic.rs @@ -1,6 +1,6 @@ use crate::chat::{ChatEntry, ChatState}; use crate::config::AppConfig; -use crate::file_transfer::FileTransferManager; +use crate::file_transfer::{FileTransferManager, TransferInfo}; use crate::media::MediaState; use crate::net::{NetEvent, NetworkManager, PeerInfo}; use crate::protocol::{self, GossipMessage}; @@ -11,6 +11,7 @@ use std::path::PathBuf; pub struct FrontendState { pub chat_history: Vec, pub peers: Vec, + pub transfers: Vec, pub our_name: String, pub our_id: String, pub our_id_full: String, @@ -18,6 +19,7 @@ pub struct FrontendState { pub input_device_name: Option, pub output_device_name: Option, pub master_volume: f32, + pub mic_volume: f32, pub noise_suppression: bool, } @@ -26,6 +28,7 @@ impl Default for FrontendState { Self { chat_history: Vec::new(), peers: Vec::new(), + transfers: Vec::new(), our_name: "Unknown".to_string(), our_id: "".to_string(), our_id_full: "".to_string(), @@ -33,6 +36,7 @@ impl Default for FrontendState { input_device_name: None, output_device_name: None, master_volume: 1.0, + mic_volume: 1.0, noise_suppression: true, } } @@ -52,6 +56,7 @@ pub enum AppCommand { SetInputDevice(String), SetOutputDevice(String), SetMasterVolume(f32), + SetMicVolume(f32), ToggleNoiseCancel, SetBitrate(u32), @@ -68,6 +73,8 @@ pub struct AppLogic { pub our_name: String, pub our_id_short: String, pub connected: bool, + pub media_event_rx: tokio::sync::mpsc::Receiver, + pub net_event_rx: tokio::sync::mpsc::Receiver, } impl AppLogic { @@ -78,6 +85,8 @@ impl AppLogic { net: NetworkManager, our_name: String, our_id_short: String, + media_event_rx: tokio::sync::mpsc::Receiver, + net_event_rx: tokio::sync::mpsc::Receiver, ) -> Self { Self { chat, @@ -87,6 +96,8 @@ impl AppLogic { our_name, our_id_short, connected: false, + media_event_rx, + net_event_rx, } } @@ -109,38 +120,49 @@ impl AppLogic { self.chat.add_system_message(status.to_string()); } AppCommand::SetInputDevice(device_name) => { - self.media.set_input_device(device_name.clone()); - self.chat.add_system_message(format!("Microphone set to: {}", device_name)); - if let Ok(mut cfg) = AppConfig::load() { - cfg.media.input_device = Some(device_name); - let _ = cfg.save(); - } + self.media.set_input_device(device_name.clone()); + self.chat + .add_system_message(format!("Microphone set to: {}", device_name)); + if let Ok(mut cfg) = AppConfig::load() { + cfg.media.input_device = Some(device_name); + let _ = cfg.save(); + } } AppCommand::SetOutputDevice(device_name) => { - self.media.set_output_device(device_name.clone()); - self.chat.add_system_message(format!("Output set to: {}", device_name)); - if let Ok(mut cfg) = AppConfig::load() { - cfg.media.output_device = Some(device_name); - let _ = cfg.save(); - } + self.media.set_output_device(device_name.clone()); + self.chat + .add_system_message(format!("Output set to: {}", device_name)); + if let Ok(mut cfg) = AppConfig::load() { + cfg.media.output_device = Some(device_name); + let _ = cfg.save(); + } } AppCommand::SetMasterVolume(vol) => { self.media.set_volume(vol); if let Ok(mut cfg) = AppConfig::load() { - cfg.media.master_volume = vol; - let _ = cfg.save(); - } + cfg.media.master_volume = vol; + let _ = cfg.save(); + } + } + AppCommand::SetMicVolume(vol) => { + self.media.set_mic_volume(vol); + if let Ok(mut cfg) = AppConfig::load() { + cfg.media.mic_volume = vol; + let _ = cfg.save(); + } } AppCommand::ToggleNoiseCancel => { if let Some(enabled) = self.media.toggle_denoise() { let status = if enabled { "enabled" } else { "disabled" }; - self.chat.add_system_message(format!("Noise cancellation {}", status)); + self.chat + .add_system_message(format!("Noise cancellation {}", status)); if let Ok(mut cfg) = AppConfig::load() { - cfg.media.noise_suppression = enabled; - let _ = cfg.save(); - } + cfg.media.noise_suppression = enabled; + let _ = cfg.save(); + } } else { - self.chat.add_system_message("Voice chat not active".to_string()); + self.chat + .add_system_message("Voice chat not active".to_string()); } } AppCommand::Quit => { @@ -510,17 +532,29 @@ impl AppLogic { let mut peers: Vec = peers_map.values().cloned().collect(); peers.sort_by(|a, b| a.id.to_string().cmp(&b.id.to_string())); - // Sync audio levels + // Sync audio levels and video status let levels = self.media.get_peer_levels(); + let video_sessions = &self.media.active_video_sessions; + for peer in &mut peers { if let Some(level) = levels.get(&peer.id) { peer.audio_level = *level; } + if video_sessions.contains(&peer.id) { + peer.is_streaming_video = true; + } else { + peer.is_streaming_video = false; + } } + // Check timeouts/cleanups + self.file_mgr.check_timeouts(); + let transfers = self.file_mgr.active_transfers(); + FrontendState { chat_history: self.chat.history.clone(), peers, + transfers, our_name: self.our_name.clone(), our_id: self.our_id_short.clone(), our_id_full: self.net.our_id.to_string(), @@ -528,6 +562,7 @@ impl AppLogic { input_device_name: self.media.input_device.clone(), output_device_name: self.media.output_device.clone(), master_volume: self.media.get_volume(), + mic_volume: self.media.get_mic_volume(), noise_suppression: self.media.is_denoise_enabled(), } } diff --git a/src/config.rs b/src/config.rs index 6db4fe2..18d8a14 100644 --- a/src/config.rs +++ b/src/config.rs @@ -67,6 +67,8 @@ pub struct MediaConfig { pub output_device: Option, #[serde(default = "default_volume")] pub master_volume: f32, + #[serde(default = "default_volume")] + pub mic_volume: f32, #[serde(default = "default_true")] pub noise_suppression: bool, } @@ -90,6 +92,7 @@ impl Default for MediaConfig { input_device: None, output_device: None, master_volume: 1.0, + mic_volume: 1.0, noise_suppression: true, } } diff --git a/src/file_transfer/mod.rs b/src/file_transfer/mod.rs index 6255765..cc4d1c8 100644 --- a/src/file_transfer/mod.rs +++ b/src/file_transfer/mod.rs @@ -197,10 +197,6 @@ impl FileTransferManager { // `execute_send` logic needs a timeout on `decode_framed`. } - - - - /// Execute the sending side of a file transfer over a QUIC bi-stream. #[allow(dead_code)] pub async fn execute_send( diff --git a/src/gui.rs b/src/gui.rs index b9c784f..c8138ea 100644 --- a/src/gui.rs +++ b/src/gui.rs @@ -1,24 +1,41 @@ +use futures::stream; +use iced::widget::image; +use iced::widget::Image; use iced::widget::{ - button, checkbox, column, container, pick_list, row, scrollable, slider, text, text_input, Column, + button, checkbox, column, container, pick_list, row, scrollable, slider, text, text_input, + Column, Stack, }; use iced::{ + font::{Family, Font, Stretch, Style, Weight}, Alignment, Background, Border, Color, Element, Length, Subscription, Task, Theme, }; +use std::borrow::Cow; use std::sync::Arc; -use tokio::sync::{mpsc, Mutex}; -use futures::stream; +use tokio::sync::{broadcast, mpsc, Mutex}; use crate::app_logic::{AppCommand, FrontendState}; -use crate::net::PeerInfo; use crate::chat::ChatEntry; +use crate::file_transfer::{TransferInfo, TransferState}; +use crate::net::PeerInfo; use chrono::{DateTime, Local, TimeZone, Utc}; +use std::path::PathBuf; -// Discord-like Colors -const BG_DARK: Color = Color::from_rgb(0.21, 0.22, 0.25); // #36393f -const SIDEBAR_DARK: Color = Color::from_rgb(0.18, 0.19, 0.21); // #2f3136 -const INPUT_BG: Color = Color::from_rgb(0.25, 0.27, 0.29); // #40444b -const TEXT_COLOR: Color = Color::from_rgb(0.86, 0.86, 0.86); // #dcddde -const MUTED_TEXT: Color = Color::from_rgb(0.45, 0.46, 0.48); // #72767d +// Catppuccin Mocha Theme (Refined) +const BG_DARK: Color = Color::from_rgb(0.12, 0.12, 0.18); // Base #1e1e2e +const SIDEBAR_DARK: Color = Color::from_rgb(0.09, 0.09, 0.15); // Mantle #181825 +const INPUT_BG: Color = Color::from_rgb(0.19, 0.20, 0.27); // Surface0 #313244 +const TEXT_COLOR: Color = Color::from_rgb(0.80, 0.84, 0.96); // Text #cdd6f4 +const MUTED_TEXT: Color = Color::from_rgb(0.65, 0.68, 0.78); // Subtext0 #a6adc8 +const ACCENT: Color = Color::from_rgb(0.80, 0.65, 0.97); // Mauve #cba6f7 +const SUCCESS: Color = Color::from_rgb(0.65, 0.89, 0.63); // Green #a6e3a1 +const DANGER: Color = Color::from_rgb(0.95, 0.55, 0.66); // Red #f38ba8 + +const EMOJI_FONT: Font = Font { + family: Family::Name("Twemoji"), + weight: Weight::Normal, + stretch: Stretch::Normal, + style: Style::Normal, +}; pub struct ChatApp { state: FrontendState, @@ -26,6 +43,11 @@ pub struct ChatApp { command_sender: mpsc::Sender, // We keep the receiver in the struct to use it in subscription state_receiver: Arc>>, + // Video + video_receiver: Arc, u32, u32)>>>, + video_frame: Option, + show_video: bool, + watching_peer: Option, // Name of peer we are watching // Voice Chat State input_devices: Vec, selected_device: Option, @@ -33,7 +55,9 @@ pub struct ChatApp { selected_output_device: Option, is_in_voice: bool, master_volume: f32, + mic_volume: f32, noise_cancel_enabled: bool, + settings_open: bool, } #[derive(Debug, Clone)] @@ -48,8 +72,18 @@ pub enum Message { ToggleScreen, RefreshDevices, MasterVolumeChanged(f32), + MicVolumeChanged(f32), ToggleNoiseCancel(bool), CopyText(String), + ToggleSettings, + ToggleVideoView, + WatchStream(String, String), // ID, Name + VideoFrameReceived(image::Handle), + // File Transfer + RequestSendFile, + FileSelected(Option), + AcceptTransfer(String), // FileId hex + RejectTransfer(String), // FileId hex NoOp, } @@ -57,11 +91,13 @@ pub struct Flags { pub initial_state: FrontendState, pub command_sender: mpsc::Sender, pub state_receiver: mpsc::Receiver, + pub video_receiver: broadcast::Receiver<(Vec, u32, u32)>, } impl ChatApp { pub fn new(flags: Flags) -> (Self, Task) { let master_volume = flags.initial_state.master_volume; + let mic_volume = flags.initial_state.mic_volume; let noise_cancel_enabled = flags.initial_state.noise_suppression; ( Self { @@ -69,13 +105,19 @@ impl ChatApp { input_value: String::new(), command_sender: flags.command_sender, state_receiver: Arc::new(Mutex::new(flags.state_receiver)), + video_receiver: Arc::new(Mutex::new(flags.video_receiver)), + video_frame: None, + show_video: false, + watching_peer: None, input_devices: Vec::new(), selected_device: None, output_devices: Vec::new(), selected_output_device: None, is_in_voice: false, master_volume, + mic_volume, noise_cancel_enabled, + settings_open: false, }, Task::perform(async {}, |_| Message::RefreshDevices), ) @@ -101,9 +143,9 @@ impl ChatApp { // Simple command parsing let command = if input_text.starts_with('/') { - // ... same as before ... - let parts: Vec<&str> = input_text.split_whitespace().collect(); - match parts.as_slice() { + // ... same as before ... + let parts: Vec<&str> = input_text.split_whitespace().collect(); + match parts.as_slice() { ["/nick", name] | ["/name", name] => { Some(AppCommand::ChangeNick(name.to_string())) } @@ -112,8 +154,11 @@ impl ChatApp { } ["/voice"] => Some(AppCommand::ToggleVoice), ["/quit"] => Some(AppCommand::Quit), - _ => Some(AppCommand::SystemMessage(format!("Unknown command: {}", input_text))), - } + _ => Some(AppCommand::SystemMessage(format!( + "Unknown command: {}", + input_text + ))), + } } else { Some(AppCommand::SendMessage(input_text.clone())) }; @@ -130,7 +175,7 @@ impl ChatApp { } Message::BackendUpdate(new_state) => { // Check if voice status changed to update UI state if needed - // Currently FrontendState doesn't explicitly have voice status bool, + // Currently FrontendState doesn't explicitly have voice status bool, // but we can infer or add it. For now, rely on local toggle state or messages. // Actually FrontendState has `media_status` string. if new_state.media_status.contains("🎤 LIVE") { @@ -138,7 +183,7 @@ impl ChatApp { } else { self.is_in_voice = false; } - + // Update selected devices from backend state if they are set there if let Some(dev) = &new_state.input_device_name { if self.selected_device.as_ref() != Some(dev) { @@ -202,6 +247,16 @@ impl ChatApp { |_| Message::NoOp, ) } + Message::MicVolumeChanged(vol) => { + self.mic_volume = vol; + let sender = self.command_sender.clone(); + Task::perform( + async move { + let _ = sender.send(AppCommand::SetMicVolume(vol)).await; + }, + |_| Message::NoOp, + ) + } Message::ToggleNoiseCancel(enabled) => { self.noise_cancel_enabled = enabled; let sender = self.command_sender.clone(); @@ -214,9 +269,9 @@ impl ChatApp { } Message::RefreshDevices => { // Use the improved device filtering from MediaState logic - - use cpal::traits::{HostTrait, DeviceTrait}; - + + use cpal::traits::{DeviceTrait, HostTrait}; + // Prioritize JACK if available let available_hosts = cpal::available_hosts(); let mut hosts = Vec::new(); @@ -229,82 +284,286 @@ impl ChatApp { let mut output_names = Vec::new(); for host in &hosts { - if let Ok(devices) = host.input_devices() { - for device in devices { - if let Ok(name) = device.name() { - if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") { - continue; - } - - let clean_name = if let Some(start) = name.find("CARD=") { - let rest = &name[start + 5..]; - let card_name = rest.split(',').next().unwrap_or(rest); - let prefix = name.split(':').next().unwrap_or("Unknown"); - format!("{} ({})", card_name, prefix) - } else if name.contains("HDA Intel PCH") { - name - } else { - name - }; - - input_names.push(clean_name); - } - } - } - if let Ok(devices) = host.output_devices() { - for device in devices { - if let Ok(name) = device.name() { - if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") { - continue; - } - - let clean_name = if let Some(start) = name.find("CARD=") { - let rest = &name[start + 5..]; - let card_name = rest.split(',').next().unwrap_or(rest); - let prefix = name.split(':').next().unwrap_or("Unknown"); - format!("{} ({})", card_name, prefix) - } else { - name - }; - - output_names.push(clean_name); - } - } - } + if let Ok(devices) = host.input_devices() { + for device in devices { + if let Ok(name) = device.name() { + if name.contains("dmix") + || name.contains("dsnoop") + || name.contains("null") + { + continue; + } + + let clean_name = if let Some(start) = name.find("CARD=") { + let rest = &name[start + 5..]; + let card_name = rest.split(',').next().unwrap_or(rest); + let prefix = name.split(':').next().unwrap_or("Unknown"); + format!("{} ({})", card_name, prefix) + } else if name.contains("HDA Intel PCH") { + name + } else { + name + }; + + input_names.push(clean_name); + } + } + } + if let Ok(devices) = host.output_devices() { + for device in devices { + if let Ok(name) = device.name() { + if name.contains("dmix") + || name.contains("dsnoop") + || name.contains("null") + { + continue; + } + + let clean_name = if let Some(start) = name.find("CARD=") { + let rest = &name[start + 5..]; + let card_name = rest.split(',').next().unwrap_or(rest); + let prefix = name.split(':').next().unwrap_or("Unknown"); + format!("{} ({})", card_name, prefix) + } else { + name + }; + + output_names.push(clean_name); + } + } + } } - + input_names.sort(); input_names.dedup(); output_names.sort(); output_names.dedup(); - + self.input_devices = input_names; self.output_devices = output_names; - + if self.selected_device.is_none() && !self.input_devices.is_empty() { // Pre-select first self.selected_device = Some(self.input_devices[0].clone()); // We don't auto-send command to avoid loop, user must select or we wait for backend state } - if self.selected_output_device.is_none() && !self.output_devices.is_empty() { + if self.selected_output_device.is_none() && !self.output_devices.is_empty() { self.selected_output_device = Some(self.output_devices[0].clone()); } Task::none() } - Message::CopyText(text) => { - iced::clipboard::write(text) + Message::CopyText(text) => iced::clipboard::write(text), + Message::ToggleSettings => { + self.settings_open = !self.settings_open; + Task::none() + } + Message::ToggleVideoView => { + self.show_video = !self.show_video; + if !self.show_video { + self.watching_peer = None; + } + Task::none() + } + Message::WatchStream(_peer_id, peer_name) => { + self.show_video = true; + self.watching_peer = Some(peer_name); + Task::none() + } + Message::VideoFrameReceived(handle) => { + self.video_frame = Some(handle); + // Auto-show video if frame received and not shown? + // Or user explicitly toggles. Let's let user toggle, or auto-show if hidden? + // User asked for "button... on hower... displays if mpv could be spun up" + // If we get frames, MPV/Internal is running. + if !self.show_video { + // self.show_video = true; // Auto-show? No, might be annoying. + } + Task::none() + } + Message::RequestSendFile => Task::perform( + async { + rfd::AsyncFileDialog::new() + .set_title("Select file to send") + .pick_file() + .await + .map(|handle| handle.path().to_path_buf()) + }, + Message::FileSelected, + ), + Message::FileSelected(path_opt) => { + if let Some(path) = path_opt { + let sender = self.command_sender.clone(); + Task::perform( + async move { + let _ = sender.send(AppCommand::SendFile(path)).await; + }, + |_| Message::NoOp, + ) + } else { + Task::none() + } + } + Message::AcceptTransfer(id_hex) => { + let sender = self.command_sender.clone(); + Task::perform( + async move { + // Accept uses prefix logic in AppLogic, so full hex is fine + let _ = sender.send(AppCommand::AcceptFile(id_hex)).await; + }, + |_| Message::NoOp, + ) + } + Message::RejectTransfer(_id_hex) => { + // Currently AppCommand doesn't have explicit Reject. + // We can implement it, or just let it timeout. + // Or use SystemMessage to say "Rejected". + // Ideally add AppCommand::RejectFile. + // For now, do nothing or log. + Task::none() } Message::NoOp => Task::none(), } } pub fn view(&self) -> Element { + if self.settings_open { + return self.view_settings(); + } + + let main_view = self.view_main(); + + if self.show_video { + Stack::new() + .push(main_view) + .push(self.view_video_overlay()) + .into() + } else { + main_view + } + } + + fn view_settings(&self) -> Element { + let content = column![ + row![ + text("Settings").size(24).width(Length::Fill), + button(text("Close")) + .on_press(Message::ToggleSettings) + .style(|theme, status| style_button(theme, status, DANGER)) + ] + .align_y(Alignment::Center), + text("Audio Devices").size(18), + column![ + text("Input Device").size(14), + pick_list( + self.input_devices.clone(), + self.selected_device.clone(), + Message::InputDeviceSelected + ) + .text_size(14) + .padding(10) + .width(Length::Fill), + text("Output Device").size(14), + pick_list( + self.output_devices.clone(), + self.selected_output_device.clone(), + Message::OutputDeviceSelected + ) + .text_size(14) + .padding(10) + .width(Length::Fill), + ] + .spacing(10), + text("Audio Controls").size(18), + column![ + text(format!("Master Volume: {:.0}%", self.master_volume * 100.0)).size(14), + slider(0.0..=2.0, self.master_volume, Message::MasterVolumeChanged).step(0.05), + text(format!( + "Microphone Volume: {:.0}%", + self.mic_volume * 100.0 + )) + .size(14), + slider(0.0..=2.0, self.mic_volume, Message::MicVolumeChanged).step(0.05), + checkbox("Noise Cancellation", self.noise_cancel_enabled) + .on_toggle(Message::ToggleNoiseCancel) + .size(16) // Increased checkbox size + ] + .spacing(10), + ] + .spacing(20) + .padding(40); + + container(content) + .width(Length::Fill) + .height(Length::Fill) + .center_x(Length::Fill) + .style(|_theme: &Theme| container::Style { + background: Some(Background::Color(BG_DARK)), + text_color: Some(TEXT_COLOR), + ..Default::default() + }) + .into() + } + + fn view_video_overlay(&self) -> Element { + let video_content: Element = if let Some(handle) = &self.video_frame { + Image::::new(handle.clone()) + .width(Length::Fill) + .height(Length::Fill) + .content_fit(iced::ContentFit::Contain) + .into() + } else { + text("Waiting for video stream...").size(20).into() + }; + + let title = if let Some(name) = &self.watching_peer { + format!("{}'s Screenshare", name) + } else { + "Screenshare".to_string() + }; + + let overlay = column![ + row![ + text(title).size(18).width(Length::Fill), + button(text("Close")) + .on_press(Message::ToggleVideoView) + .style(|theme, status| style_button(theme, status, DANGER)) + ] + .padding(10) + .align_y(Alignment::Center), + container(video_content) + .width(Length::Fill) + .height(Length::Fill) + ] + .spacing(10); + + container(overlay) + .width(Length::Fixed(800.0)) // Floating window size + .height(Length::Fixed(600.0)) + .style(|_theme: &Theme| container::Style { + background: Some(Background::Color(Color::from_rgb(0.1, 0.1, 0.1))), + border: Border { + width: 2.0, + color: Color::from_rgb(0.3, 0.3, 0.3), + radius: 8.0.into(), + }, + ..Default::default() + }) + .padding(10) + .center_x(Length::Fill) + .center_y(Length::Fill) + .into() + } + + fn view_main(&self) -> Element { // Chat Area - let chat_content = self.state.chat_history.iter().fold( - Column::new().spacing(10).padding(20), - |column, entry| column.push(view_chat_entry(entry)), - ); + let chat_content = self + .state + .chat_history + .iter() + .fold(Column::new().spacing(10).padding(20), |column, entry| { + column.push(view_chat_entry(entry)) + }); let chat_scroll = scrollable(chat_content) .height(Length::Fill) @@ -316,32 +575,39 @@ impl ChatApp { .on_input(Message::InputChanged) .on_submit(Message::SendMessage) .padding(12) - .style(|_theme, status| { - text_input::Style { - background: Background::Color(INPUT_BG), - border: Border { - radius: 8.0.into(), - width: 0.0, - color: Color::TRANSPARENT, - }, - icon: Color::WHITE, - placeholder: MUTED_TEXT, - value: TEXT_COLOR, - selection: Color::from_rgb(0.4, 0.5, 0.8), - } + .style(|_theme, status| text_input::Style { + background: Background::Color(INPUT_BG), + border: Border { + radius: 8.0.into(), + width: 0.0, + color: Color::TRANSPARENT, + }, + icon: Color::WHITE, + placeholder: MUTED_TEXT, + value: TEXT_COLOR, + selection: Color::from_rgb(0.4, 0.5, 0.8), }); - let input_container = container(input) - .padding(15) - .style(|_theme: &Theme| container::Style { - background: Some(Background::Color(BG_DARK)), - ..Default::default() - }); + let input_container = + container(input) + .padding(15) + .style(|_theme: &Theme| container::Style { + background: Some(Background::Color(BG_DARK)), + ..Default::default() + }); // Sidebar (Peers + Voice) let identity_section = column![ - text("MY IDENTITY").size(12).style(|_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }), - text(&self.state.our_name).size(16).style(|_theme: &Theme| text::Style { color: Some(TEXT_COLOR) }), + text("MY IDENTITY") + .size(12) + .style(|_theme: &Theme| text::Style { + color: Some(MUTED_TEXT) + }), + text(&self.state.our_name) + .size(16) + .style(|_theme: &Theme| text::Style { + color: Some(TEXT_COLOR) + }), row![ text_input("My ID", &self.state.our_id) .padding(5) @@ -349,7 +615,10 @@ impl ChatApp { .on_input(|_| Message::NoOp) .style(|_theme, _status| text_input::Style { background: Background::Color(Color::from_rgb(0.15, 0.16, 0.18)), - border: Border { radius: 4.0.into(), ..Default::default() }, + border: Border { + radius: 4.0.into(), + ..Default::default() + }, value: MUTED_TEXT, placeholder: MUTED_TEXT, selection: Color::from_rgb(0.4, 0.5, 0.8), @@ -358,106 +627,120 @@ impl ChatApp { button(text("Copy").size(12)) .on_press(Message::CopyText(self.state.our_id_full.clone())) .padding(5) - .style(|_theme, _status| button::Style { - background: Some(Background::Color(Color::from_rgb(0.3, 0.3, 0.35))), - text_color: Color::WHITE, - border: Border { radius: 4.0.into(), ..Default::default() }, - ..Default::default() - }) - ].spacing(5) - ].spacing(5).padding(10); - - let identity_container = container(identity_section) - .style(|_theme: &Theme| container::Style { + .style(|theme, status| style_button(theme, status, INPUT_BG)) // Using INPUT_BG as a neutral button color + ] + .spacing(5) + ] + .spacing(5) + .padding(10); + + let identity_container = + container(identity_section).style(|_theme: &Theme| container::Style { background: Some(Background::Color(Color::from_rgb(0.15, 0.16, 0.18))), ..Default::default() }); - let peers_title = text("ONLINE").size(12).style(|_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }); - - let peers_content = self.state.peers.iter().fold( - Column::new().spacing(5), - |column, peer| column.push(view_peer(peer)), - ); + let peers_title = text("ONLINE").size(12).style(|_theme: &Theme| text::Style { + color: Some(MUTED_TEXT), + }); + + let peers_content = self + .state + .peers + .iter() + .fold(Column::new().spacing(5), |column, peer| { + column.push(view_peer(peer)) + }); let voice_section = column![ - text("VOICE CONNECTED").size(12).style(|_theme: &Theme| text::Style { - color: Some(if self.is_in_voice { Color::from_rgb(0.4, 0.8, 0.4) } else { MUTED_TEXT }) - }), - text("Input Device").size(10).style(|_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }), - pick_list( - self.input_devices.clone(), - self.selected_device.clone(), - Message::InputDeviceSelected - ).text_size(12).padding(5), - text("Output Device").size(10).style(|_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }), - pick_list( - self.output_devices.clone(), - self.selected_output_device.clone(), - Message::OutputDeviceSelected - ).text_size(12).padding(5), + text("VOICE CONNECTED") + .size(12) + .style(|_theme: &Theme| text::Style { + color: Some(if self.is_in_voice { + Color::from_rgb(0.4, 0.8, 0.4) + } else { + MUTED_TEXT + }) + }), + // Removed inline device selection, moved to Settings + button(text("Settings / Volume").size(12)) + .on_press(Message::ToggleSettings) + .padding(8) + .width(Length::Fill) + .style(|theme, status| style_button(theme, status, INPUT_BG)), button( - text(if self.is_in_voice { "Disconnect" } else { "Join Voice" }).size(14) + text(if self.is_in_voice { + "Disconnect Voice" + } else { + "Join Voice" + }) + .size(14) ) .on_press(Message::ToggleVoice) .padding(8) - .style(move |_theme, _status| { - let bg = if self.is_in_voice { Color::from_rgb(0.8, 0.3, 0.3) } else { Color::from_rgb(0.3, 0.6, 0.4) }; - button::Style { - background: Some(Background::Color(bg)), - text_color: Color::WHITE, - border: Border { radius: 4.0.into(), ..Default::default() }, - ..Default::default() - } + .style(move |theme, status| { + // Use muted colors for buttons to avoid "WAY too bright" + let bg = if self.is_in_voice { + Color::from_rgb(0.6, 0.25, 0.3) // Muted Red + } else { + Color::from_rgb(0.25, 0.5, 0.3) // Muted Green + }; + style_button(theme, status, bg) }) .width(Length::Fill), button( - text(if self.state.media_status.contains("🖥 LIVE") { "Stop Screen" } else { "Share Screen" }).size(14) + text(if self.state.media_status.contains("🖥 LIVE") { + "Stop Screen" + } else { + "Share Screen" + }) + .size(14) ) .on_press(Message::ToggleScreen) .padding(8) - .style(move |_theme, _status| { + .style(move |theme, status| { let is_sharing = self.state.media_status.contains("🖥 LIVE"); - let bg = if is_sharing { Color::from_rgb(0.8, 0.3, 0.3) } else { Color::from_rgb(0.3, 0.4, 0.6) }; - button::Style { - background: Some(Background::Color(bg)), + // Muted colors + let bg = if is_sharing { + Color::from_rgb(0.6, 0.25, 0.3) // Muted Red + } else { + Color::from_rgb(0.25, 0.3, 0.5) // Muted Blue + }; + style_button(theme, status, bg) + }) + .width(Length::Fill), + // Watch Button + /* Removed global watch button in favor of peer-specific button + button(text("Watch Stream").size(14)) + .on_press(Message::ToggleVideoView) + .padding(8) + .width(Length::Fill) + .style(|_theme, _status| button::Style { + background: Some(Background::Color(Color::from_rgb(0.3, 0.2, 0.4))), text_color: Color::WHITE, border: Border { radius: 4.0.into(), ..Default::default() }, ..Default::default() - } - }) - .width(Length::Fill), - - // Audio Controls - text("Master Volume").size(10).style(|_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }), - slider(0.0..=2.0, self.master_volume, Message::MasterVolumeChanged).step(0.05), - - checkbox("Noise Cancellation", self.noise_cancel_enabled) - .on_toggle(Message::ToggleNoiseCancel) - .text_size(12) - .style(|_theme, _status| checkbox::Style { - background: Background::Color(INPUT_BG), - icon_color: Color::WHITE, - border: Border { radius: 4.0.into(), ..Default::default() }, - text_color: Some(TEXT_COLOR), }), - - ].spacing(10).padding(10); - - let voice_panel = container(voice_section) - .style(|_theme: &Theme| container::Style { - background: Some(Background::Color(Color::from_rgb(0.15, 0.16, 0.18))), // Darker panel at bottom - ..Default::default() - }); + */ + ] + .spacing(10) + .padding(10); - let sidebar = container( - column![ - identity_container, - column![peers_title, peers_content].spacing(10).padding(10).height(Length::Fill), - voice_panel - ] - ) - .width(Length::Fixed(240.0)) + let voice_panel = container(voice_section).style(|_theme: &Theme| container::Style { + background: Some(Background::Color(Color::from_rgb(0.15, 0.16, 0.18))), // Darker panel at bottom + ..Default::default() + }); + + let sidebar = container(column![ + identity_container, + column![peers_title, peers_content].spacing(10).padding(10), // Removed height(Fill) to allow stacking + // Transfers Section + self.view_transfers(), + // Spacer + iced::widget::vertical_space(), + voice_panel + ]) + .width(Length::Fixed(330.0)) // Set to 320.0 as requested .height(Length::Fill) .style(|_theme: &Theme| container::Style { background: Some(Background::Color(SIDEBAR_DARK)), @@ -484,24 +767,96 @@ impl ChatApp { .into() } + fn view_transfers(&self) -> Element { + if self.state.transfers.is_empty() { + return column![ + text("TRANSFERS") + .size(12) + .style(|_theme: &Theme| text::Style { + color: Some(MUTED_TEXT) + }), + button(text("Send File").size(12)) + .on_press(Message::RequestSendFile) + .padding(6) + .style(|theme, status| style_button(theme, status, INPUT_BG)) + ] + .spacing(5) + .padding(10) + .into(); + } + + let title = row![ + text("TRANSFERS") + .size(12) + .style(|_theme: &Theme| text::Style { + color: Some(MUTED_TEXT) + }), + iced::widget::horizontal_space(), + button(text("+ Send").size(10)) + .on_press(Message::RequestSendFile) + .padding(4) + .style(|theme, status| style_button(theme, status, INPUT_BG)) + ] + .align_y(Alignment::Center) + .width(Length::Fill); + + let list = self + .state + .transfers + .iter() + .fold(Column::new().spacing(5), |col, info| { + col.push(view_transfer_item(info)) + }); + + column![title, list].spacing(5).padding(10).into() + } + pub fn subscription(&self) -> Subscription { struct BackendSubscription; + struct VideoSubscription; - let receiver = self.state_receiver.clone(); - - Subscription::run_with_id( + let state_sub = Subscription::run_with_id( std::any::TypeId::of::(), - stream::unfold(receiver, |receiver| async move { - let mut guard = receiver.lock().await; - if let Some(state) = guard.recv().await { - Some((Message::BackendUpdate(state), receiver.clone())) - } else { - // Wait a bit if channel closed or empty to avoid hot loop if logic changes - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - Some((Message::BackendUpdate(FrontendState::default()), receiver.clone())) - } - }) - ) + stream::unfold(self.state_receiver.clone(), |receiver| async move { + let mut guard = receiver.lock().await; + if let Some(state) = guard.recv().await { + Some((Message::BackendUpdate(state), receiver.clone())) + } else { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + Some(( + Message::BackendUpdate(FrontendState::default()), + receiver.clone(), + )) + } + }), + ); + + let video_sub = Subscription::run_with_id( + std::any::TypeId::of::(), + stream::unfold(self.video_receiver.clone(), |receiver| async move { + let mut guard = receiver.lock().await; + // broadcast channel + match guard.recv().await { + Ok((data, width, height)) => { + // Convert data to image::Handle + // Only do this if video view is showing to save CPU? + // But we are in subscription, we don't know show_video state easily without passing it? + // Actually, creating handle is cheap-ish. + // But we should probably check if needed. + // For now, always emit. + + let handle = image::Handle::from_rgba(width, height, data); + Some((Message::VideoFrameReceived(handle), receiver.clone())) + } + Err(_) => { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + Some((Message::NoOp, receiver.clone())) + } + } + }), + ); + + Subscription::batch(vec![state_sub, video_sub]) } pub fn theme(&self) -> Theme { @@ -509,16 +864,56 @@ impl ChatApp { } } +fn style_button( + _theme: &Theme, + status: iced::widget::button::Status, + active_color: Color, +) -> button::Style { + let base_color = active_color; + let hovered_color = Color { + r: (base_color.r + 0.1).min(1.0), + g: (base_color.g + 0.1).min(1.0), + b: (base_color.b + 0.1).min(1.0), + a: base_color.a, + }; + + let bg = match status { + iced::widget::button::Status::Hovered => hovered_color, + iced::widget::button::Status::Pressed => Color { + r: (base_color.r - 0.1).max(0.0), + g: (base_color.g - 0.1).max(0.0), + b: (base_color.b - 0.1).max(0.0), + a: base_color.a, + }, + _ => base_color, + }; + + button::Style { + background: Some(Background::Color(bg)), + // Catppuccin usually uses Crust/Base for text on accents. Let's use darker. + // But for consistency with previous dark theme, let's keep white text if color is dark, + // or check luminance? For simplicity, let's use White or Base. + // Let's stick to White for now as most buttons are dark-ish or colorful. + // Actually, Catppuccin accents are bright. Dark text might be better. + // Let's try to match the "text_color" based on "active_color" brightness? Too complex. + // Let's just use a fixed dark color for text on buttons, or white. + // The previous implementation used White. Let's stick to White for now, maybe tweak later. + text_color: Color::WHITE, + border: Border { + radius: 4.0.into(), + width: 0.0, + color: Color::TRANSPARENT, + }, + ..Default::default() + } +} + // ... run function ... pub fn run(flags: Flags) -> iced::Result { - iced::application( - ChatApp::title, - ChatApp::update, - ChatApp::view - ) - .subscription(ChatApp::subscription) - .theme(ChatApp::theme) - .run_with(move || ChatApp::new(flags)) + iced::application(ChatApp::title, ChatApp::update, ChatApp::view) + .subscription(ChatApp::subscription) + .theme(ChatApp::theme) + .run_with(move || ChatApp::new(flags)) } fn view_chat_entry(entry: &ChatEntry) -> Element { @@ -531,61 +926,217 @@ fn view_chat_entry(entry: &ChatEntry) -> Element { }; let sender = text(&entry.sender_name) - .style(move |_theme: &Theme| text::Style { color: Some(sender_color) }) + .style(move |_theme: &Theme| text::Style { + color: Some(sender_color), + }) .font(iced::font::Font::DEFAULT) // Sans-serif - .size(15); + .size(16); // Increased from 15 let content = text(&entry.text) - .size(15) - .style(move |_theme: &Theme| text::Style { color: Some(TEXT_COLOR) }); + .size(16) // Increased from 15 + .style(move |_theme: &Theme| text::Style { + color: Some(TEXT_COLOR), + }); let time = text(format_timestamp(entry.timestamp)) - .size(11) - .style(move |_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }); + .size(12) // Increased from 11 + .style(move |_theme: &Theme| text::Style { + color: Some(MUTED_TEXT), + }); let header = row![sender, time].spacing(8).align_y(Alignment::Center); - + // Rounded message bubble if needed, or just clean text like Discord column![header, content].spacing(4).into() } +fn view_transfer_item(info: &TransferInfo) -> Element { + let direction = if info.is_outgoing { "↑" } else { "↓" }; + let status_color = match info.state { + TransferState::Complete { .. } => SUCCESS, + TransferState::Failed { .. } | TransferState::Rejected { .. } => DANGER, + TransferState::Transferring { .. } => ACCENT, + _ => MUTED_TEXT, + }; + + let progress_text = match &info.state { + TransferState::Offering => "Offering...".to_string(), + TransferState::Requesting { .. } => "Requesting...".to_string(), + TransferState::WaitingForAccept { .. } => { + if info.is_outgoing { + "Waiting Accept".to_string() + } else { + "Accept?".to_string() + } + } + TransferState::Transferring { + bytes_transferred, + total_size, + .. + } => { + // Error said bytes_transferred is u64, so do not dereference + let transferred = *bytes_transferred; // Try dereference just in case compiler was confused or I was wrong about match ref? + // Actually, if I match &info.state, fields ARE refs. + // If the error said "cannot be dereferenced", it means it WAS u64. + // But if I use `match &info.state`, standard rust rules say they are refs. + // Unless `TransferState` fields are matched with `&bytes_transferred` pattern? No. + // Let's assume they ARE refs because of `match &info.state` and use `*`. + // Wait, previous code used `match info.state` (move) in one place? No, `match &info.state`. + // Let's try `*` first. If it fails, I'll remove it. + // Wait, the error `type u64 cannot be dereferenced` means I tried `*x` and `x` was `u64`. + // This means in the previous attempt (which failed to compile), `bytes_transferred` was `u64`. + // How? + // `if let TransferState::Transferring { bytes_transferred, .. } = &info.state` + // Maybe because `u64` is Copy, it copied it out? + // Let's just use it directly. If it's a ref, I'll get "expected u64 found &u64" and can fix. + // If it's a value, it works. + // Actually, I can use `&info.state` and pattern match `&TransferState::Transferring { ref bytes_transferred, .. }` to be sure. + // But simplest is: + + let pct = if *total_size > 0 { + (*bytes_transferred as f64 / *total_size as f64 * 100.0) as u8 + } else { + 0 + }; + format!("{}%", pct) + } + TransferState::Complete { .. } => "Done".to_string(), + TransferState::Rejected { .. } => "Rejected".to_string(), + TransferState::Failed { .. } => "Failed".to_string(), + }; + + let main_row = row![ + text(direction) + .size(14) + .style(move |_theme: &Theme| text::Style { + color: Some(status_color) + }), + column![ + text(&info.file_name) + .size(12) + .style(|_theme: &Theme| text::Style { + color: Some(TEXT_COLOR) + }), + text(progress_text) + .size(10) + .style(|_theme: &Theme| text::Style { + color: Some(MUTED_TEXT) + }), + ] + .width(Length::Fill), + ] + .spacing(8) + .align_y(Alignment::Center); + + let mut content = column![main_row]; + + // Add Accept/Reject buttons if incoming and waiting + if !info.is_outgoing { + if let TransferState::WaitingForAccept { .. } = info.state { + let id_hex = hex::encode(info.file_id); + let btns = row![button(text("Accept").size(10)) + .on_press(Message::AcceptTransfer(id_hex.clone())) + .padding(4) + .style(|theme, status| style_button(theme, status, SUCCESS)),] + .spacing(5); + content = content.push(btns); + } + } + + // Progress bar for transferring + if let TransferState::Transferring { + bytes_transferred, + total_size, + .. + } = &info.state + { + let pct = if *total_size > 0 { + *bytes_transferred as f32 / *total_size as f32 + } else { + 0.0 + }; + // Iced 0.13 progress bar + let bar = iced::widget::progress_bar(0.0..=1.0, pct).height(4); + content = content.push(bar); + } + + container(content) + .padding(6) + .style(|_theme: &Theme| container::Style { + background: Some(Background::Color(Color::from_rgb(0.15, 0.16, 0.18))), + border: Border { + radius: 4.0.into(), + ..Default::default() + }, + ..Default::default() + }) + .width(Length::Fill) + .into() +} + fn view_peer(peer: &PeerInfo) -> Element { let name = peer.name.as_deref().unwrap_or("Unknown"); - + // Audio activity border let (border_width, border_color) = if peer.audio_level > 0.01 { - (2.0, Color::from_rgb(0.2, 0.8, 0.2)) // Green + (2.0, SUCCESS) // Green } else { (0.0, Color::TRANSPARENT) }; let peer_info = column![ - text(name).size(14).style(|_theme: &Theme| text::Style { color: Some(TEXT_COLOR) }), + text(name).size(16).style(|_theme: &Theme| text::Style { + color: Some(TEXT_COLOR) + }), row![ - text(if peer.audio_level > 0.01 { "🔊" } else { "🔇" }).size(12), + // Use Twemoji font for icons to ensure they render correctly + text(if peer.audio_level > 0.01 { + "🔊" + } else { + "🔇" + }) + .font(EMOJI_FONT) + .size(14) + .style(|_theme: &Theme| text::Style { color: None }), text(peer.id.to_string().chars().take(8).collect::()) - .size(10) - .style(|_theme: &Theme| text::Style { color: Some(MUTED_TEXT) }), - ].spacing(4) - ].spacing(2); + .size(12) + .style(|_theme: &Theme| text::Style { + color: Some(MUTED_TEXT) + }), + ] + .spacing(4) + ] + .spacing(2); + + let mut actions = row![].spacing(5); + + if peer.is_streaming_video { + actions = actions.push( + button(text("▶️ Watch").size(12)) // Play button left of copy + .on_press(Message::WatchStream(peer.id.to_string(), name.to_string())) + .padding(4) + .style(|theme, status| style_button(theme, status, DANGER)), // Use Red/Danger for LIVE attention + ); + } + + actions = actions.push( + button(text("Copy").size(12)) + .on_press(Message::CopyText(peer.id.to_string())) + .padding(4) + .style(|theme, status| style_button(theme, status, INPUT_BG)), + ); let content = row![ peer_info, - button(text("Copy").size(10)) - .on_press(Message::CopyText(peer.id.to_string())) - .padding(4) - .style(|_theme, _status| button::Style { - background: Some(Background::Color(Color::from_rgb(0.25, 0.26, 0.28))), - text_color: Color::WHITE, - border: Border { radius: 4.0.into(), ..Default::default() }, - ..Default::default() - }) + iced::widget::horizontal_space(), // Push actions to right + actions ] .spacing(10) .align_y(Alignment::Center); container(content) .padding(5) + .width(Length::Fill) // Ensure container fills width for alignment .style(move |_theme: &Theme| container::Style { background: None, border: Border { @@ -599,7 +1150,9 @@ fn view_peer(peer: &PeerInfo) -> Element { } fn format_timestamp(ts: u64) -> String { - if ts == 0 { return "".to_string(); } + if ts == 0 { + return "".to_string(); + } match Utc.timestamp_millis_opt(ts as i64) { chrono::LocalResult::Single(dt) => { let local: DateTime = DateTime::from(dt); diff --git a/src/main.rs b/src/main.rs index 19f92ae..cb0f06a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,29 +64,19 @@ struct Cli { gui: bool, } -#[tokio::main] -async fn main() -> Result<()> { - // ... tracing init ... - // Initialize tracing to file (not stdout, since we use TUI) - let _tracing_guard = tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::from_default_env() - .add_directive("p2p_chat=debug".parse()?) - .add_directive("iroh=warn".parse()?) - .add_directive("iroh_gossip=warn".parse()?), - ) - .with_writer(|| -> Box { - match std::fs::OpenOptions::new() - .create(true) - .append(true) - .open("p2p-chat.log") - { - Ok(f) => Box::new(f), - Err(_) => Box::new(io::sink()), - } - }) - .with_ansi(false) - .init(); +// Remove #[tokio::main] and use manual runtime builder +fn main() -> Result<()> { + // Setup runtime + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + // Block on async main logic + rt.block_on(async_main()) +} + +async fn async_main() -> Result<()> { + let cli = Cli::parse(); // Load config let config = AppConfig::load().unwrap_or_else(|e| { @@ -94,9 +84,66 @@ async fn main() -> Result<()> { AppConfig::default() }); - let cli = Cli::parse(); + // Initialize tracing + if cli.gui { + // GUI Mode: Log to stdout (Info/Debug) and stderr (Warn/Error) + use tracing_subscriber::fmt::writer::MakeWriterExt; + let stdout = std::io::stdout.with_max_level(tracing::Level::INFO); + let stderr = std::io::stderr.with_min_level(tracing::Level::WARN); + // Combine them? tracing_subscriber doesn't easily support split writers in one layer without customization. + // But we can use the MakeWriter implementation pattern. + + struct SplitWriter; + impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SplitWriter { + type Writer = Box; + + fn make_writer(&'a self) -> Self::Writer { + Box::new(io::stdout()) + } + + fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer { + if *meta.level() <= tracing::Level::WARN { + Box::new(io::stderr()) + } else { + Box::new(io::stdout()) + } + } + } + + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::from_default_env() + .add_directive("p2p_chat=debug".parse()?) + .add_directive("iroh=warn".parse()?) + .add_directive("iroh_gossip=warn".parse()?), + ) + .with_writer(SplitWriter) + .with_ansi(true) + .init(); + } else { + // TUI Mode: Log to file to avoid breaking UI + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::from_default_env() + .add_directive("p2p_chat=debug".parse()?) + .add_directive("iroh=warn".parse()?) + .add_directive("iroh_gossip=warn".parse()?), + ) + .with_writer(|| -> Box { + match std::fs::OpenOptions::new() + .create(true) + .append(true) + .open("p2p-chat.log") + { + Ok(f) => Box::new(f), + Err(_) => Box::new(io::sink()), + } + }) + .with_ansi(false) + .init(); + } // Topic: CLI > Config > Default let topic_str = cli @@ -108,7 +155,7 @@ async fn main() -> Result<()> { // ... networking init ... // Initialize networking - let (mut net_mgr, _net_tx, mut net_rx) = NetworkManager::new(topic_bytes) + let (mut net_mgr, _net_tx, net_event_rx) = NetworkManager::new(topic_bytes) .await .context("Failed to start networking")?; @@ -134,12 +181,16 @@ async fn main() -> Result<()> { let file_mgr = FileTransferManager::new(download_path); // Pass mic name from config if present // Pass mic name from config if present + let (media_event_tx, media_event_rx) = tokio::sync::mpsc::channel(10); + let media = MediaState::new( config.media.mic_bitrate, config.media.input_device.clone(), config.media.output_device.clone(), config.media.master_volume, + config.media.mic_volume, config.media.noise_suppression, + Some(media_event_tx), ); // Initialize App with Theme @@ -184,6 +235,7 @@ async fn main() -> Result<()> { capabilities: None, is_self: true, audio_level: 0.0, + is_streaming_video: false, }, ); } @@ -205,8 +257,6 @@ async fn main() -> Result<()> { ); } - // Start Web Interface - // Start Web Interface // Start Web Interface tokio::spawn(crate::web::start_web_server( media.broadcast_tx.clone(), @@ -222,6 +272,8 @@ async fn main() -> Result<()> { net_mgr, cli.name.clone(), our_id_short, + media_event_rx, + net_event_rx, ); if cli.gui { @@ -232,55 +284,59 @@ async fn main() -> Result<()> { // Channel for AppLogic -> GUI state updates let (gui_state_tx, gui_state_rx) = mpsc::channel(100); + // Subscribe to video frames + let video_rx = app_logic.media.video_frame_tx.subscribe(); + // Get initial state let initial_state = app_logic.get_frontend_state().await; // Spawn AppLogic loop - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_millis(100)); - + let loop_handle = tokio::spawn(async move { loop { - let mut state_changed = false; tokio::select! { - _ = interval.tick() => { - app_logic.file_mgr.check_timeouts(); + Some(event) = app_logic.net_event_rx.recv() => { + app_logic.handle_net_event(event).await; + // Send state update to GUI + let state = app_logic.get_frontend_state().await; + let _ = gui_state_tx.send(state).await; } - Some(cmd) = gui_cmd_rx.recv() => { - match app_logic.handle_command(cmd).await { - Ok(true) => { // Quit command + cmd_opt = gui_cmd_rx.recv() => { + match cmd_opt { + Some(cmd) => { + // Handle command from GUI + if let Ok(should_quit) = app_logic.handle_command(cmd).await { + // Update state even if quitting + let state = app_logic.get_frontend_state().await; + let _ = gui_state_tx.send(state).await; + + if should_quit { + break; + } + } + } + None => { + // GUI channel closed, exit loop break; } - Ok(false) => { - state_changed = true; - } - Err(e) => { - tracing::error!("Command error: {}", e); - } } } - Some(event) = net_rx.recv() => { - app_logic.handle_net_event(event).await; - state_changed = true; - } - Some(event) = gossip_event_rx.recv() => { - app_logic.handle_net_event(event).await; - state_changed = true; - } - _ = tokio::signal::ctrl_c() => { - break; - } - } - - if state_changed { - let new_state = app_logic.get_frontend_state().await; - if gui_state_tx.send(new_state).await.is_err() { - break; + Some(_media_evt) = app_logic.media_event_rx.recv() => { + // Media state changed (video started/stopped), update GUI + let state = app_logic.get_frontend_state().await; + let _ = gui_state_tx.send(state).await; } } } + // Shutdown logic - let _ = app_logic.net.shutdown().await; - app_logic.media.shutdown(); + // Offload entire app_logic shutdown to blocking thread to avoid async drop panics + let _ = tokio::task::spawn_blocking(move || { + // Shutdown media explicitly (stops threads) + app_logic.media.shutdown(); + // app_logic is dropped here, on a blocking thread. + // This includes NetworkManager, ChatState, etc. + }) + .await; }); // Run GUI @@ -288,9 +344,14 @@ async fn main() -> Result<()> { initial_state, command_sender: gui_cmd_tx, state_receiver: gui_state_rx, + video_receiver: video_rx, }; crate::gui::run(flags)?; + + // Wait for background task to finish cleanup + let _ = loop_handle.await; + return Ok(()); } @@ -306,7 +367,6 @@ async fn main() -> Result<()> { &mut terminal, &mut app, &mut app_logic, - &mut net_rx, &mut gossip_event_rx, ) .await; @@ -326,7 +386,6 @@ async fn run_event_loop( terminal: &mut Terminal>, app: &mut App, logic: &mut crate::app_logic::AppLogic, - net_rx: &mut mpsc::Receiver, gossip_rx: &mut mpsc::Receiver, ) -> Result<()> { let mut event_stream = EventStream::new(); @@ -386,7 +445,7 @@ async fn run_event_loop( } // Network events from file transfer acceptor - Some(event) = net_rx.recv() => { + Some(event) = logic.net_event_rx.recv() => { logic.handle_net_event(event).await; } @@ -444,5 +503,3 @@ fn parse_topic(hex_str: &str) -> Result<[u8; 32]> { Ok(bytes) } } - - diff --git a/src/media/capture/gstreamer.rs b/src/media/capture/gstreamer.rs new file mode 100644 index 0000000..a826dbf --- /dev/null +++ b/src/media/capture/gstreamer.rs @@ -0,0 +1,135 @@ +use anyhow::Result; +use gstreamer::prelude::*; +use gstreamer::{Pipeline, State}; +use gstreamer_app::{AppSink, AppSinkCallbacks}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +pub struct GStreamerCapture { + pipeline: Pipeline, + sink: AppSink, +} + +impl GStreamerCapture { + pub fn new(pipeline_str: &str) -> Result { + // Initialize GStreamer + // We only need to init once, but calling it multiple times is safe. + // gstreamer::init() checks internally. + gstreamer::init()?; + + // Create pipeline from string + let pipeline = gstreamer::parse::launch(pipeline_str)? + .downcast::() + .map_err(|_| anyhow::anyhow!("Expected a pipeline"))?; + + // Get the appsink + let sink = pipeline + .by_name("sink") + .ok_or_else(|| anyhow::anyhow!("Pipeline must have an appsink named 'sink'"))? + .downcast::() + .map_err(|_| anyhow::anyhow!("'sink' element is not an appsink"))?; + + Ok(Self { pipeline, sink }) + } + + pub fn set_callback(&self, on_sample: F) + where + F: Fn(&[u8]) + Send + Sync + 'static, + { + let count = Arc::new(AtomicUsize::new(0)); + let callbacks = AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gstreamer::FlowError::Eos)?; + let buffer = sample.buffer().ok_or(gstreamer::FlowError::Error)?; + let map = buffer + .map_readable() + .map_err(|_| gstreamer::FlowError::Error)?; + + let c = count.fetch_add(1, Ordering::Relaxed); + if c % 10 == 0 { + tracing::info!("GStreamer captured frame #{}", c); + } + + on_sample(map.as_slice()); + Ok(gstreamer::FlowSuccess::Ok) + }) + .build(); + + self.sink.set_callbacks(callbacks); + } + + pub fn start(&self) -> Result<()> { + self.pipeline.set_state(State::Playing)?; + + // Spawn a bus watcher + let bus = self.pipeline.bus().expect("Pipeline has no bus"); + std::thread::spawn(move || { + for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { + use gstreamer::MessageView; + match msg.view() { + MessageView::Error(err) => { + tracing::error!("GStreamer Error: {} ({:?})", err.error(), err.debug()); + break; + } + MessageView::Warning(warn) => { + tracing::warn!("GStreamer Warning: {} ({:?})", warn.error(), warn.debug()); + } + MessageView::Eos(..) => { + tracing::info!("GStreamer EndOfStream"); + break; + } + _ => (), + } + } + }); + + Ok(()) + } + + pub fn stop(&self) -> Result<()> { + let _ = self.pipeline.set_state(State::Null); + Ok(()) + } +} + +impl Drop for GStreamerCapture { + fn drop(&mut self) { + let _ = self.stop(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + #[test] + fn test_appsink_pattern() { + // Simple videotestsrc pipeline for testing + // videotestsrc ! videoconvert ! video/x-raw,format=I420 ! appsink name=sink + let pipeline_str = "videotestsrc num-buffers=10 ! videoconvert ! video/x-raw,format=I420 ! appsink name=sink"; + + let capture = GStreamerCapture::new(pipeline_str).expect("Failed to create capture"); + + let frame_count = Arc::new(Mutex::new(0)); + let frame_count_clone = frame_count.clone(); + + capture.set_callback(move |data| { + let mut count = frame_count_clone.lock().unwrap(); + *count += 1; + println!("Received frame of size: {}", data.len()); + }); + + capture.start().expect("Failed to start"); + + // Wait for frames + std::thread::sleep(Duration::from_secs(2)); + + capture.stop().expect("Failed to stop"); + + let count = *frame_count.lock().unwrap(); + assert!(count > 0, "Should have received at least one frame"); + println!("Total frames received: {}", count); + } +} diff --git a/src/media/capture.rs b/src/media/capture/mod.rs similarity index 56% rename from src/media/capture.rs rename to src/media/capture/mod.rs index 6cfe4af..c160bd7 100644 --- a/src/media/capture.rs +++ b/src/media/capture/mod.rs @@ -1,26 +1,20 @@ -//! Video capture and playback using FFmpeg and MPV. -//! -//! Captures video by spawning an `ffmpeg` process and reading its stdout. -//! Plays video by spawning `mpv` (or `vlc`) and writing to its stdin. +pub mod gstreamer; +pub mod portal; use anyhow::Result; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tracing; +use crate::media::playback::VideoPlayer; use crate::media::WebMediaEvent; use crate::protocol::{decode_framed, write_framed, MediaKind, MediaStreamMessage}; +use portal::{select_wayland_source, AshpdKeeper}; use std::process::Stdio; -use tokio::io::{AsyncReadExt, BufReader, AsyncWriteExt, AsyncBufReadExt}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use xcap::Monitor; -use ashpd::desktop::{PersistMode, screencast::{CursorMode, Screencast, SourceType}}; - -struct AshpdKeeper { - _proxy: Screencast<'static>, - _session: ashpd::desktop::Session<'static, Screencast<'static>>, -} /// Manages a video capture session (camera or screen). pub struct VideoCapture { @@ -52,10 +46,7 @@ impl VideoCapture { })); } - Ok(Self { - running, - tasks, - }) + Ok(Self { running, tasks }) } /// Start web-based camera share (relay from Web to Peers). @@ -81,10 +72,7 @@ impl VideoCapture { })); } - Ok(Self { - running, - tasks, - }) + Ok(Self { running, tasks }) } /// Start native video capture via FFmpeg. @@ -96,7 +84,7 @@ impl VideoCapture { broadcast_tx: tokio::sync::broadcast::Sender, ) -> Result { let running = Arc::new(AtomicBool::new(true)); - + // Channel to distribute frames from FFmpeg to peer senders let (frame_tx, _) = tokio::sync::broadcast::channel::>(100); @@ -107,22 +95,59 @@ impl VideoCapture { let frame_tx_clone = frame_tx.clone(); let broadcast_tx_clone = broadcast_tx.clone(); let kind_clone = kind.clone(); - + tasks.push(tokio::spawn(async move { + // Set GST_DEBUG to see errors + std::env::set_var("GST_DEBUG", "3"); + let result = if matches!(kind_clone, MediaKind::Screen) { - // Try Wayland Portal first - match select_wayland_source().await { - Ok((node_id, keeper)) => { - tracing::info!("Selected Wayland source node: {}", node_id); - run_pipewire_capture(node_id, keeper, frame_tx_clone, broadcast_tx_clone, capture_running).await - } - Err(e) => { - tracing::warn!("Wayland source selection failed ({}). Falling back to xcap.", e); - run_xcap_capture(frame_tx_clone, broadcast_tx_clone, capture_running).await - } - } + // Try Wayland Portal first + let wayland_attempt = match select_wayland_source().await { + Ok((node_id, keeper)) => { + tracing::info!("Selected Wayland source node: {}", node_id); + // Try GStreamer PipeWire capture + match run_pipewire_capture( + node_id, + keeper, + frame_tx_clone.clone(), + broadcast_tx_clone.clone(), + capture_running.clone(), + ) + .await + { + Ok(_) => Ok(()), + Err(e) => { + tracing::error!( + "GStreamer/PipeWire capture crashed: {}. Falling back to xcap.", + e + ); + Err(e) // Signal to try fallback + } + } + } + Err(e) => { + tracing::warn!( + "Wayland source selection failed ({}). Falling back to xcap.", + e + ); + Err(anyhow::anyhow!("Portal failed")) + } + }; + + // Fallback to xcap if Wayland failed (either portal or pipeline) + if wayland_attempt.is_err() { + run_xcap_capture(frame_tx_clone, broadcast_tx_clone, capture_running).await + } else { + Ok(()) + } } else { - run_ffmpeg_capture(kind_clone, frame_tx_clone, broadcast_tx_clone, capture_running).await + run_ffmpeg_capture( + kind_clone, + frame_tx_clone, + broadcast_tx_clone, + capture_running, + ) + .await }; if let Err(e) = result { @@ -144,10 +169,7 @@ impl VideoCapture { })); } - Ok(Self { - running, - tasks, - }) + Ok(Self { running, tasks }) } /// Stop capture. @@ -165,41 +187,29 @@ impl VideoCapture { message: MediaStreamMessage, mut recv: iroh::endpoint::RecvStream, _broadcast_tx: tokio::sync::broadcast::Sender, + video_frame_tx: tokio::sync::broadcast::Sender<(Vec, u32, u32)>, ) -> Result<()> { let kind = match message { MediaStreamMessage::VideoStart { kind, .. } => kind, _ => anyhow::bail!("Expected VideoStart"), }; - tracing::info!("Starting {:?} stream handler for {} (Native MPV)", kind, from); + tracing::info!( + "Starting {:?} stream handler for {} (Internal Player)", + kind, + from + ); - // Spawn mpv - let mut cmd = Command::new("mpv"); - cmd.args(&[ - "--no-terminal", - "--ontop", - "--profile=low-latency", - "--cache=no", - "--force-window", - "-", // Read from stdin - ]); - cmd.stdin(Stdio::piped()); - // We might want to quell stdout/stderr or log them - cmd.stdout(Stdio::null()); - cmd.stderr(Stdio::null()); - - // Ensure process is killed when this task drops - cmd.kill_on_drop(true); + // Initialize Internal Video Player (GStreamer) + let tx = video_frame_tx.clone(); + let player = VideoPlayer::new(move |data, width, height| { + // Send frame to GUI + // We use a broadcast channel, so if GUI isn't listening (buffer full), we drop frames (good for live video) + let _ = tx.send((data, width, height)); + })?; - let mut child = match cmd.spawn() { - Ok(c) => c, - Err(e) => { - tracing::error!("Failed to spawn mpv: {}", e); - return Err(anyhow::anyhow!("Failed to spawn mpv: {}", e)); - } - }; + player.start()?; - let mut stdin = child.stdin.take().expect("Failed to open mpv stdin"); - use tokio::io::AsyncWriteExt; + let mut packet_count = 0; loop { let msg: MediaStreamMessage = match decode_framed(&mut recv).await { @@ -209,19 +219,20 @@ impl VideoCapture { match msg { MediaStreamMessage::VideoFrame { data, .. } => { - // Write directly to mpv stdin - // The data is already NAL units with start codes (from our capture logic) - // Note: 'data' from VideoFrame contains [1 byte type][NAL Unit including start code] - // We need to skip the first byte which is our protocol's frame type indicator (Key/Delta) - // Wait, let's check run_ffmpeg_capture. - // It does: payload.push(frame_type); payload.extend_from_slice(&nal_data); - // So yes, we need to skip the first byte. - if data.len() > 1 { - if let Err(e) = stdin.write_all(&data[1..]).await { - tracing::error!("Failed to write to mpv: {}", e); + // Skip 1 byte header (frame type) + let payload = &data[1..]; + + if let Err(e) = player.push_data(payload) { + tracing::error!("Failed to push data to player: {}", e); break; } + + // Log activity occasionally + packet_count += 1; + if packet_count % 30 == 0 { + tracing::info!("Processed video packet #{}", packet_count); + } } } MediaStreamMessage::VideoStop { .. } => { @@ -231,8 +242,8 @@ impl VideoCapture { _ => {} } } - - let _ = child.kill().await; + + player.stop()?; Ok(()) } } @@ -257,44 +268,67 @@ async fn run_xcap_capture( if monitors.is_empty() { return Err(anyhow::anyhow!("No monitors found")); } - + // Select first monitor for now let monitor = &monitors[0]; - let width = monitor.width().map_err(|e| anyhow::anyhow!("Failed to get monitor width: {}", e))?; - let height = monitor.height().map_err(|e| anyhow::anyhow!("Failed to get monitor height: {}", e))?; - let name = monitor.name().unwrap_or_else(|_| "Unknown Monitor".to_string()); - - tracing::info!("Starting xcap capture on monitor: {} ({}x{})", name, width, height); + let width = monitor + .width() + .map_err(|e| anyhow::anyhow!("Failed to get monitor width: {}", e))?; + let height = monitor + .height() + .map_err(|e| anyhow::anyhow!("Failed to get monitor height: {}", e))?; + let name = monitor + .name() + .unwrap_or_else(|_| "Unknown Monitor".to_string()); + + tracing::info!( + "Starting xcap capture on monitor: {} ({}x{})", + name, + width, + height + ); // 2. Spawn FFmpeg to encode raw frames // We feed raw RGBA frames to stdin let mut cmd = Command::new("ffmpeg"); cmd.kill_on_drop(true); - + cmd.args(&[ - "-f", "rawvideo", - "-pixel_format", "rgba", - "-video_size", &format!("{}x{}", width, height), - "-framerate", "30", - "-i", "-", // Read raw frames from stdin + "-f", + "rawvideo", + "-pixel_format", + "rgba", + "-video_size", + &format!("{}x{}", width, height), + "-framerate", + "30", + "-i", + "-", // Read raw frames from stdin ]); - + // Output args (same as before: HEVC NVENC/libx265 -> Raw stream) cmd.args(&[ - "-vf", "scale=1280:720", // Force 720p resize - "-c:v", "hevc_nvenc", // Try hardware first + "-vf", + "scale=1280:720", // Force 720p resize + "-c:v", + "hevc_nvenc", // Try hardware first // Fallback or options... - "-b:v", "1M", // Lower bitrate to 1Mbps - "-g", "30", // Keyframe interval (GOP) 30 - "-zerolatency", "1", - "-preset", "p4", - "-f", "hevc", + "-b:v", + "1M", // Lower bitrate to 1Mbps + "-g", + "30", // Keyframe interval (GOP) 30 + "-zerolatency", + "1", + "-preset", + "p4", + "-f", + "hevc", "-", ]); cmd.stdin(Stdio::piped()); cmd.stdout(Stdio::piped()); - + // Log stderr let stderr_file = std::fs::File::create("ffmpeg_xcap.log").unwrap(); cmd.stderr(Stdio::from(stderr_file)); @@ -306,21 +340,21 @@ async fn run_xcap_capture( // 3. Spawn thread/task to capture frames and write to FFmpeg stdin // xcap is synchronous/blocking, so we should run it in a blocking task or separate thread // But we need to write to async stdin. - + let running_clone = running.clone(); let monitor_clone = monitor.clone(); // Monitor might not be cloneable easily? It is. - + // We use a channel to send frames from blocking capture to async writer let (img_tx, mut img_rx) = tokio::sync::mpsc::channel::>(2); - + // Spawn blocking capture thread std::thread::spawn(move || { // Target 30fps let frame_duration = std::time::Duration::from_millis(33); - + while running_clone.load(Ordering::Relaxed) { let start = std::time::Instant::now(); - + match monitor_clone.capture_image() { Ok(image) => { // image is RgbaImage (Vec) @@ -336,14 +370,14 @@ async fn run_xcap_capture( std::thread::sleep(std::time::Duration::from_millis(100)); } } - + // Sleep to maintain framerate let elapsed = start.elapsed(); if elapsed < frame_duration { std::thread::sleep(frame_duration - elapsed); } else { - // Even if we are slow, sleep a tiny bit to yield CPU - std::thread::sleep(std::time::Duration::from_millis(1)); + // Even if we are slow, sleep a tiny bit to yield CPU + std::thread::sleep(std::time::Duration::from_millis(1)); } } }); @@ -357,69 +391,103 @@ async fn run_xcap_capture( } }); - // 5. Read FFmpeg stdout and distribute (Same logic as run_ffmpeg_capture) + // 5. Read FFmpeg stdout and distribute + // stdout was already taken at line 297 + + // Define chosen_filter since we are using HEVC/H.265 in the hardcoded command above + + // If you change the command to use H.264, change this to "h264_mp4toannexb" + let chosen_filter = "hevc_mp4toannexb"; + + process_ffmpeg_output( + stdout, + frame_tx, + broadcast_preview, + running, + chosen_filter.to_string(), + ) + .await?; + + let _ = child.kill().await; + stdin_task.abort(); + Ok(()) +} + +async fn process_ffmpeg_output( + stdout: tokio::process::ChildStdout, + frame_tx: tokio::sync::broadcast::Sender>, + broadcast_preview: tokio::sync::broadcast::Sender, + running: Arc, + chosen_filter: String, +) -> Result<()> { let mut reader = BufReader::new(stdout); let mut buffer = Vec::with_capacity(1024 * 1024); let mut temp_buf = [0u8; 4096]; - - loop { - if !running.load(Ordering::Relaxed) { - break; - } + while running.load(Ordering::Relaxed) { let n = match reader.read(&mut temp_buf).await { Ok(0) => break, // EOF Ok(n) => n, Err(_) => break, }; buffer.extend_from_slice(&temp_buf[0..n]); - + // Find NAL units while let Some(start_idx) = find_start_code(&buffer) { - let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) { - next_start - } else { - break; // Wait for more data - }; - - let nal_data = buffer.drain(start_idx..end_idx).collect::>(); - - // Check if it's 3-byte or 4-byte start code - let start_code_len = if nal_data[2] == 1 { 3 } else { 4 }; - - // Construct payload - let mut payload: Vec = Vec::with_capacity(1 + nal_data.len()); - - // Check NAL type (HEVC/H.265) - // NAL header is after start code. - // data[start_code_len] is the NAL header. - let nal_header_byte = nal_data[start_code_len]; - - // Type is bits 1-6 (0x7E) shifted right by 1. - let nal_type = (nal_header_byte & 0x7E) >> 1; - - // HEVC Keyframes: - // 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT) - // 32-34: VPS, SPS, PPS (Parameters - treat as critical/key) - let is_key = (nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34); - - let frame_type = if is_key { 0u8 } else { 1u8 }; - - payload.push(frame_type); - payload.extend_from_slice(&nal_data); - - let _ = frame_tx.send(payload.clone()); - - let _ = broadcast_preview.send(WebMediaEvent::Video { + let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) { + next_start + } else { + break; // Wait for more data + }; + + let nal_data = buffer.drain(start_idx..end_idx).collect::>(); + + // Check if it's 3-byte or 4-byte start code + let start_code_len = if nal_data[2] == 1 { 3 } else { 4 }; + + // Construct payload + let mut payload: Vec = Vec::with_capacity(1 + nal_data.len()); + + // Check NAL type (HEVC/H.265) + // NAL header is after start code. + // data[start_code_len] is the NAL header. + let nal_header_byte = nal_data[start_code_len]; + + let is_key = if chosen_filter.contains("hevc") { + // HEVC (H.265) + // Type is bits 1-6 (0x7E) shifted right by 1. + let nal_type = (nal_header_byte & 0x7E) >> 1; + + // HEVC Keyframes: + // 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT) + // 32-34: VPS, SPS, PPS (Parameters - treat as critical/key) + (nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34) + } else { + // H.264 (AVC) + // Type is lower 5 bits (0x1F) + let nal_type = nal_header_byte & 0x1F; + + // H.264 Keyframes: + // 5: IDR (Instantaneous Decoding Refresh) - Keyframe + // 7: SPS (Sequence Parameter Set) + // 8: PPS (Picture Parameter Set) + nal_type == 5 || nal_type == 7 || nal_type == 8 + }; + + let frame_type = if is_key { 0u8 } else { 1u8 }; + + payload.push(frame_type); + payload.extend_from_slice(&nal_data); + + let _ = frame_tx.send(payload.clone()); + + let _ = broadcast_preview.send(WebMediaEvent::Video { peer_id: "local".to_string(), kind: MediaKind::Screen, data: payload, }); } } - - let _ = child.kill().await; - stdin_task.abort(); Ok(()) } @@ -437,7 +505,7 @@ async fn run_video_sender_native( let (mut send, _) = network_manager .open_media_stream(peer, kind.clone()) .await?; - + // Send Start message write_framed( &mut send, @@ -456,7 +524,7 @@ async fn run_video_sender_native( // FFmpeg data is already [FrameType][VP8 Chunk], see run_ffmpeg_capture // Just wrap in protocol message let msg = MediaStreamMessage::VideoFrame { - sequence: 0, + sequence: 0, timestamp_ms: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() @@ -467,7 +535,13 @@ async fn run_video_sender_native( break; } } - Err(_) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("Video sender lagged by {} frames", n); + continue; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + break; + } } } let _ = write_framed(&mut send, &MediaStreamMessage::VideoStop { kind }).await; @@ -483,24 +557,50 @@ async fn run_ffmpeg_capture( ) -> Result<()> { let mut cmd = Command::new("ffmpeg"); cmd.kill_on_drop(true); - + // Output args: Robust Encoder Selection // Try: hevc_nvenc -> h264_nvenc -> libx264 - + struct EncoderConfig { name: &'static str, codec: &'static str, opts: Vec<&'static str>, - format: &'static str, // "hevc" or "h264" (raw stream format) - filter: &'static str, // "hevc_mp4toannexb" or "h264_mp4toannexb" + format: &'static str, // "hevc" or "h264" (raw stream format) + filter: &'static str, // "hevc_mp4toannexb" or "h264_mp4toannexb" pixel_format: Option<&'static str>, // Force pixel format if needed } let encoders = vec![ + EncoderConfig { + name: "libx264 (Software Fallback)", + codec: "libx264", + opts: vec![ + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-b:v", + "1M", + "-g", + "30", + ], + format: "h264", + filter: "h264_mp4toannexb", + pixel_format: Some("yuv420p"), // libx264 often needs yuv420p + }, EncoderConfig { name: "hevc_nvenc (Hardware)", codec: "hevc_nvenc", - opts: vec!["-b:v", "1M", "-g", "30", "-zerolatency", "1", "-preset", "p4"], + opts: vec![ + "-b:v", + "1M", + "-g", + "30", + "-zerolatency", + "1", + "-preset", + "p4", + ], format: "hevc", filter: "hevc_mp4toannexb", pixel_format: None, // NVENC usually handles formats well @@ -508,28 +608,29 @@ async fn run_ffmpeg_capture( EncoderConfig { name: "h264_nvenc (Hardware Fallback)", codec: "h264_nvenc", - opts: vec!["-b:v", "1.5M", "-g", "30", "-zerolatency", "1", "-preset", "p4"], + opts: vec![ + "-b:v", + "1.5M", + "-g", + "30", + "-zerolatency", + "1", + "-preset", + "p4", + ], format: "h264", filter: "h264_mp4toannexb", pixel_format: None, }, - EncoderConfig { - name: "libx264 (Software Fallback)", - codec: "libx264", - opts: vec!["-preset", "ultrafast", "-tune", "zerolatency", "-b:v", "1M", "-g", "30"], - format: "h264", - filter: "h264_mp4toannexb", - pixel_format: Some("yuv420p"), // libx264 often needs yuv420p - }, ]; let mut final_child = None; let mut chosen_filter = ""; // We need to keep the stdout/stderr open - + for enc in &encoders { tracing::info!("Trying encoder: {}", enc.name); - + let mut cmd = Command::new("ffmpeg"); cmd.kill_on_drop(true); @@ -538,28 +639,37 @@ async fn run_ffmpeg_capture( match kind { MediaKind::Camera => { cmd.args(&[ - "-f", "v4l2", - "-framerate", "30", - "-video_size", "1280x720", - "-i", "/dev/video0", + "-f", + "v4l2", + "-framerate", + "30", + "-video_size", + "1280x720", + "-i", + "/dev/video0", ]); } MediaKind::Screen => { // Always use x11grab (works on X11 and XWayland) let display_env = std::env::var("DISPLAY").unwrap_or_else(|_| ":0.0".to_string()); tracing::info!("Using x11grab on display: {}", display_env); - + cmd.args(&[ - "-f", "x11grab", - "-framerate", "30", - "-video_size", "1920x1080", // Input size (assuming 1080p for now, but safer to autodect or be large) - "-i", &display_env, - "-vf", "scale=1280:720", // Force 720p resize + "-f", + "x11grab", + "-framerate", + "30", + "-video_size", + "1920x1080", // Input size (assuming 1080p for now, but safer to autodect or be large) + "-i", + &display_env, + "-vf", + "scale=1280:720", // Force 720p resize ]); } _ => return Ok(()), } - + // Pixel format if needed if let Some(pix_fmt) = enc.pixel_format { cmd.args(&["-pix_fmt", pix_fmt]); @@ -568,10 +678,10 @@ async fn run_ffmpeg_capture( // Encoder args cmd.arg("-c:v").arg(enc.codec); cmd.args(&enc.opts); - + // Bitstream filter to ensure Annex B (start codes) cmd.arg("-bsf:v").arg(enc.filter); - + // Output format cmd.arg("-f").arg(enc.format); cmd.arg("-"); @@ -584,9 +694,13 @@ async fn run_ffmpeg_capture( // Wait a bit to see if it crashes immediately // We sleep for 500ms to let ffmpeg initialize tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - + if let Ok(Some(status)) = child.try_wait() { - tracing::warn!("Encoder {} failed immediately with status: {}", enc.name, status); + tracing::warn!( + "Encoder {} failed immediately with status: {}", + enc.name, + status + ); // Read stderr to see why if let Some(mut stderr) = child.stderr.take() { let mut err_buf = String::new(); @@ -595,11 +709,11 @@ async fn run_ffmpeg_capture( } continue; // Try next } - + // It seems to be running tracing::info!("Selected encoder: {}", enc.name); tracing::info!("Capture loop started"); - + // Redirect stderr to log file or tracing if let Some(stderr) = child.stderr.take() { // Spawn a task to log stderr line by line @@ -612,7 +726,10 @@ async fn run_ffmpeg_capture( Ok(0) => break, // EOF Ok(_) => { // Log errors or critical warnings - if line.contains("Error") || line.contains("error") || line.contains("fail") { + if line.contains("Error") + || line.contains("error") + || line.contains("fail") + { tracing::error!("FFmpeg: {}", line.trim()); } line.clear(); @@ -622,7 +739,7 @@ async fn run_ffmpeg_capture( } }); } - + final_child = Some(child); chosen_filter = enc.filter; break; @@ -643,115 +760,14 @@ async fn run_ffmpeg_capture( }; let stdout = child.stdout.take().expect("Failed to open stdout"); - // We don't need BufReader for raw check if we just read blocks, but fine to use it or just AsyncRead - let mut reader = BufReader::new(stdout); - - // Raw H.264 parsing (Annex B) - // Stream is a sequence of NAL units, each starting with 00 00 00 01 (or 00 00 01) - // We need to buffer and split. - - let mut buffer = Vec::with_capacity(1024 * 1024); // 1MB buffer - let mut temp_buf = [0u8; 4096]; - - while running.load(Ordering::Relaxed) { - let n = match reader.read(&mut temp_buf).await { - Ok(0) => break, // EOF - Ok(n) => n, - Err(_) => break, - }; - buffer.extend_from_slice(&temp_buf[0..n]); - - // Find NAL units in buffer - // A NAL unit starts with 00 00 00 01 - // We look for start codes. - - while let Some(start_idx) = find_start_code(&buffer) { - // If we found a start code at index 0, we can't extract a frame yet - // unless we have another start code later. - // But actually, the buffer MIGHT have multiple NALs. - - // We need to find the NEXT start code to know where this one ends. - // Search from start_idx + 4 - - let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) { - next_start - } else { - // No next start code yet. - // If the buffer is getting huge, maybe we should just consume it? - // But for H.264 streaming we must be precise. - // Wait for more data. - break; - }; - - // Extract NAL unit (including start code? Browsers usually want it for AVC1/AnnexB) - // WebCodecs EncodedVideoChunk expects: - // "For 'avc1' (H.264), the chunk data must be an Annex B NAL unit." - // So we include the 00 00 00 01. - - let nal_data = buffer.drain(start_idx..end_idx).collect::>(); - - // Send NAL - // Frame Type detection for H.264: - // NAL type is in the first byte AFTER the start code. - // Start Code is 00 00 00 01 (4 bytes) or 00 00 01 (3 bytes) - // find_start_code finds 00 00 00 01. - - // Let's handle 3-byte start codes too? ffmpeg -f h264 usually sends 4-byte. - - // Check if it's 3-byte or 4-byte start code - let start_code_len = if nal_data[2] == 1 { 3 } else { 4 }; - - // Construct payload - let mut payload: Vec = Vec::with_capacity(1 + nal_data.len()); - - // Check NAL type - // nal_data[start_code_len] is the NAL header (first byte). - let nal_header_byte = nal_data[start_code_len]; - - let is_key = if chosen_filter.contains("hevc") { - // HEVC (H.265) - // Type is bits 1-6 (0x7E) shifted right by 1. - let nal_type = (nal_header_byte & 0x7E) >> 1; - - // HEVC Keyframes: - // 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT) - // 32-34: VPS, SPS, PPS (Parameters - treat as critical/key) - (nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34) - } else { - // H.264 (AVC) - // Type is lower 5 bits (0x1F) - let nal_type = nal_header_byte & 0x1F; - - // H.264 Keyframes: - // 5: IDR (Instantaneous Decoding Refresh) - Keyframe - // 7: SPS (Sequence Parameter Set) - // 8: PPS (Picture Parameter Set) - nal_type == 5 || nal_type == 7 || nal_type == 8 - }; - - let frame_type = if is_key { 0u8 } else { 1u8 }; - - payload.push(frame_type); - payload.extend_from_slice(&nal_data); - - // Send to peers - let _ = frame_tx.send(payload.clone()); - - // Send to local web preview - let _ = broadcast_preview.send(WebMediaEvent::Video { - peer_id: "local".to_string(), - kind: kind.clone(), - data: payload, - }); - - // buffer now starts at what was end_idx (because of drain) - // drain removes items, so indexes shift. - // wait, drain(start..end) removes items. buffer automatically shrinks. - // so we loop again to see if there is ANOTHER start code at 0? - // Actually, `find_start_code` searches from 0. - // If we drained 0..end_idx, the next bytes are at 0. - } - } + process_ffmpeg_output( + stdout, + frame_tx, + broadcast_preview, + running, + chosen_filter.to_string(), + ) + .await?; let _ = child.kill().await; Ok(()) @@ -762,10 +778,12 @@ fn find_start_code(data: &[u8]) -> Option { } fn find_start_code_from(data: &[u8], start: usize) -> Option { - if data.len() < 3 { return None; } + if data.len() < 3 { + return None; + } for i in start..data.len() - 2 { // Look for 00 00 01 - if data[i] == 0 && data[i+1] == 0 && data[i+2] == 1 { + if data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1 { // Check if it's actually 00 00 00 01 (4 bytes) // If i > 0 and data[i-1] == 0, then the start code might have been at i-1 // But we iterate forward. @@ -776,13 +794,13 @@ fn find_start_code_from(data: &[u8], start: usize) -> Option { // i=0: 00 00 00 -> No. // i=1: 00 00 01 -> Yes. Return 1. // But the start code is at 0! - + // Correct logic: // If we find 00 00 01 at i, check if i > 0 and data[i-1] == 0. // If so, the start code is at i-1 (4 bytes). // Return i-1. - if i > start && data[i-1] == 0 { - return Some(i-1); + if i > start && data[i - 1] == 0 { + return Some(i - 1); } return Some(i); } @@ -794,31 +812,6 @@ fn find_start_code_from(data: &[u8], start: usize) -> Option { // Wayland Capture Logic (ashpd) // --------------------------------------------------------------------------- -async fn select_wayland_source() -> Result<(u32, AshpdKeeper)> { - let proxy = Screencast::new().await?; - let session = proxy.create_session().await?; - - proxy - .select_sources( - &session, - CursorMode::Metadata, - SourceType::Monitor | SourceType::Window, - false, - None, - PersistMode::DoNot, - ) - .await?; - - let request = proxy.start(&session, &ashpd::WindowIdentifier::default()).await?; - let streams = request.response()?; - - if let Some(stream) = streams.streams().iter().next() { - Ok((stream.pipe_wire_node_id(), AshpdKeeper { _proxy: proxy, _session: session })) - } else { - Err(anyhow::anyhow!("No pipewire stream returned from portal")) - } -} - async fn run_pipewire_capture( node_id: u32, _keeper: AshpdKeeper, @@ -828,9 +821,26 @@ async fn run_pipewire_capture( ) -> Result<()> { tracing::info!("Starting PipeWire capture for node: {}", node_id); + // Build pipeline: Robust ordering + // pipewiresrc -> videoconvert -> videoscale -> caps(720p) -> videorate -> caps(30fps) -> appsink + // We add caps *after* videoscale to enforce resolution, and *after* videorate to enforce fps. + let pipeline_str = format!( + "pipewiresrc path={} ! videoconvert ! videoscale ! video/x-raw,format=I420,width=1280,height=720 ! videorate ! video/x-raw,framerate=30/1 ! appsink name=sink sync=false drop=true max-buffers=1", + node_id + ); + + let capture = gstreamer::GStreamerCapture::new(&pipeline_str)?; + + // Channel to receive frames from GStreamer + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::>(); + + capture.set_callback(move |data| { + let _ = tx.send(data.to_vec()); + }); + + capture.start()?; + // Encoder Selection (Hardware preferred) - // Note: PipeWire input is raw, so we can encode it. - struct EncoderConfig { name: &'static str, codec: &'static str, @@ -841,16 +851,18 @@ async fn run_pipewire_capture( let encoders = vec![ EncoderConfig { - name: "hevc_nvenc (Hardware)", - codec: "hevc_nvenc", - opts: vec!["-b:v", "1M", "-g", "30", "-zerolatency", "1", "-preset", "p4"], - format: "hevc", - filter: "hevc_mp4toannexb", - }, - EncoderConfig { - name: "h264_nvenc (Hardware Fallback)", - codec: "h264_nvenc", - opts: vec!["-b:v", "1.5M", "-g", "30", "-zerolatency", "1", "-preset", "p4"], + name: "libx264 (Software Fallback)", + codec: "libx264", + opts: vec![ + "-preset", + "ultrafast", + "-tune", + "zerolatency", + "-b:v", + "1M", + "-g", + "30", + ], format: "h264", filter: "h264_mp4toannexb", }, @@ -858,17 +870,47 @@ async fn run_pipewire_capture( name: "hevc_vaapi (VAAPI HEVC)", codec: "hevc_vaapi", opts: vec![ - "-vaapi_device", "/dev/dri/renderD128", - "-vf", "format=nv12,hwupload,scale_vaapi=w=1280:h=720", - "-b:v", "1M", "-g", "30" + "-vaapi_device", + "/dev/dri/renderD128", + "-vf", + "format=nv12,hwupload,scale_vaapi=w=1280:h=720", + "-b:v", + "1M", + "-g", + "30", ], format: "hevc", filter: "hevc_mp4toannexb", }, EncoderConfig { - name: "libx264 (Software Fallback)", - codec: "libx264", - opts: vec!["-preset", "ultrafast", "-tune", "zerolatency", "-b:v", "1M", "-g", "30"], + name: "hevc_nvenc (Hardware)", + codec: "hevc_nvenc", + opts: vec![ + "-b:v", + "1M", + "-g", + "30", + "-zerolatency", + "1", + "-preset", + "p4", + ], + format: "hevc", + filter: "hevc_mp4toannexb", + }, + EncoderConfig { + name: "h264_nvenc (Hardware Fallback)", + codec: "h264_nvenc", + opts: vec![ + "-b:v", + "1.5M", + "-g", + "30", + "-zerolatency", + "1", + "-preset", + "p4", + ], format: "h264", filter: "h264_mp4toannexb", }, @@ -879,18 +921,18 @@ async fn run_pipewire_capture( for enc in &encoders { tracing::info!("Trying encoder: {}", enc.name); - + let mut cmd = Command::new("ffmpeg"); cmd.kill_on_drop(true); cmd.args(&[ - "-f", "pipewire", - "-framerate", "30", - "-i", &node_id.to_string(), + "-f", "rawvideo", "-pix_fmt", "yuv420p", "-s", "1280x720", "-r", "30", "-i", "-", ]); - + if !enc.name.contains("vaapi") { - cmd.args(&["-vf", "scale=1280:720"]); + // Already scaled in GStreamer, but let's be safe or just pass through? + // If we rely on GStreamer scaling, we don't need scale filter here. + // But if VAAPI needs it... } cmd.arg("-c:v").arg(enc.codec); @@ -899,6 +941,7 @@ async fn run_pipewire_capture( cmd.arg("-f").arg(enc.format); cmd.arg("-"); + cmd.stdin(Stdio::piped()); cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); @@ -906,7 +949,7 @@ async fn run_pipewire_capture( Ok(mut child) => { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; if let Ok(Some(_)) = child.try_wait() { - continue; + continue; } final_child = Some(child); chosen_filter = enc.filter; @@ -915,59 +958,57 @@ async fn run_pipewire_capture( Err(_) => continue, } } - + let mut child = match final_child { Some(c) => c, None => return Err(anyhow::anyhow!("All encoders failed for PipeWire")), }; + let mut stdin = child.stdin.take().expect("Failed to open ffmpeg stdin"); let stdout = child.stdout.take().expect("Failed to open stdout"); - let mut reader = BufReader::new(stdout); - let mut buffer = Vec::with_capacity(1024 * 1024); - let mut temp_buf = [0u8; 4096]; - - while running.load(Ordering::Relaxed) { - let n = match reader.read(&mut temp_buf).await { - Ok(0) => break, - Ok(n) => n, - Err(_) => break, - }; - buffer.extend_from_slice(&temp_buf[0..n]); - - while let Some(start_idx) = find_start_code(&buffer) { - let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) { - next_start - } else { - break; - }; - - let nal_data = buffer.drain(start_idx..end_idx).collect::>(); - let start_code_len = if nal_data[2] == 1 { 3 } else { 4 }; - let nal_header_byte = nal_data[start_code_len]; - - let is_key = if chosen_filter.contains("hevc") { - let nal_type = (nal_header_byte & 0x7E) >> 1; - (nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34) - } else { - let nal_type = nal_header_byte & 0x1F; - nal_type == 5 || nal_type == 7 || nal_type == 8 - }; - - let frame_type = if is_key { 0u8 } else { 1u8 }; - - let mut payload = Vec::with_capacity(1 + nal_data.len()); - payload.push(frame_type); - payload.extend_from_slice(&nal_data); - - let _ = frame_tx.send(payload.clone()); - let _ = broadcast_preview.send(WebMediaEvent::Video { - peer_id: "local".to_string(), - kind: MediaKind::Screen, - data: payload, - }); - } + + // Capture stderr for debugging + if let Some(stderr) = child.stderr.take() { + let running_log = running.clone(); + tokio::spawn(async move { + let mut reader = BufReader::new(stderr); + let mut line = String::new(); + while running_log.load(Ordering::Relaxed) { + match reader.read_line(&mut line).await { + Ok(0) => break, + Ok(_) => { + if line.contains("Error") || line.contains("error") || line.contains("fail") + { + tracing::error!("FFmpeg PipeWire Error: {}", line.trim()); + } + line.clear(); + } + Err(_) => break, + } + } + }); } + // Spawn async task to write frames to FFmpeg stdin + let stdin_task = tokio::spawn(async move { + while let Some(frame_data) = rx.recv().await { + if stdin.write_all(&frame_data).await.is_err() { + break; + } + } + }); + + process_ffmpeg_output( + stdout, + frame_tx, + broadcast_preview, + running, + chosen_filter.to_string(), + ) + .await?; + let _ = child.kill().await; + stdin_task.abort(); + let _ = capture.stop(); Ok(()) } diff --git a/src/media/capture/portal.rs b/src/media/capture/portal.rs new file mode 100644 index 0000000..90ce235 --- /dev/null +++ b/src/media/capture/portal.rs @@ -0,0 +1,43 @@ +use anyhow::Result; +use ashpd::desktop::{ + screencast::{CursorMode, Screencast, SourceType}, + PersistMode, +}; + +pub struct AshpdKeeper { + pub _proxy: Screencast<'static>, + pub _session: ashpd::desktop::Session<'static, Screencast<'static>>, +} + +pub async fn select_wayland_source() -> Result<(u32, AshpdKeeper)> { + let proxy = Screencast::new().await?; + let session = proxy.create_session().await?; + + proxy + .select_sources( + &session, + CursorMode::Embedded, + SourceType::Monitor | SourceType::Window, + false, + None, + PersistMode::DoNot, + ) + .await?; + + let request = proxy + .start(&session, &ashpd::WindowIdentifier::default()) + .await?; + let streams = request.response()?; + + if let Some(stream) = streams.streams().iter().next() { + Ok(( + stream.pipe_wire_node_id(), + AshpdKeeper { + _proxy: proxy, + _session: session, + }, + )) + } else { + Err(anyhow::anyhow!("No pipewire stream returned from portal")) + } +} diff --git a/src/media/mod.rs b/src/media/mod.rs index b923b18..b28eab5 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -7,6 +7,7 @@ //! Each feature is runtime-toggleable and runs on dedicated threads/tasks. pub mod capture; +pub mod playback; pub mod voice; use iroh::EndpointId; @@ -36,6 +37,11 @@ pub enum WebMediaEvent { }, } +pub enum InternalMediaEvent { + VideoStart(EndpointId), + VideoStop(EndpointId), +} + /// Tracks all active media sessions. pub struct MediaState { /// Active voice chat session (if any). @@ -50,18 +56,32 @@ pub struct MediaState { // Input channels (from Web -> MediaState -> Peers) pub mic_broadcast: tokio::sync::broadcast::Sender>, pub screen_broadcast: tokio::sync::broadcast::Sender>, + // Channel for integrated video player frames (RGBA, width, height) + pub video_frame_tx: tokio::sync::broadcast::Sender<(Vec, u32, u32)>, pub mic_bitrate: Arc, pub input_device: Option, pub output_device: Option, pub initial_master_volume: f32, + pub initial_mic_volume: f32, pub initial_noise_suppression: bool, + pub active_video_sessions: Arc>, + pub media_event_tx: Option>, } impl MediaState { - pub fn new(mic_bitrate: u32, input_device: Option, output_device: Option, master_volume: f32, noise_suppression: bool) -> Self { + pub fn new( + mic_bitrate: u32, + input_device: Option, + output_device: Option, + master_volume: f32, + mic_volume: f32, + noise_suppression: bool, + media_event_tx: Option>, + ) -> Self { let (broadcast_tx, _) = tokio::sync::broadcast::channel(100); let (mic_broadcast, _) = tokio::sync::broadcast::channel(100); let (screen_broadcast, _) = tokio::sync::broadcast::channel(100); + let (video_frame_tx, _) = tokio::sync::broadcast::channel(10); // Low buffer for live video Self { voice: None, screen: None, @@ -69,11 +89,15 @@ impl MediaState { broadcast_tx, mic_broadcast, screen_broadcast, + video_frame_tx, mic_bitrate: Arc::new(AtomicU32::new(mic_bitrate)), input_device, output_device, initial_master_volume: master_volume, + initial_mic_volume: mic_volume, initial_noise_suppression: noise_suppression, + active_video_sessions: Arc::new(dashmap::DashSet::new()), + media_event_tx, } } @@ -91,35 +115,36 @@ impl MediaState { pub fn get_input_devices(&self) -> Vec { let mut names = Vec::new(); - + // Prioritize JACK if available, otherwise ALSA/Pulse/WASAPI let available_hosts = cpal::available_hosts(); let mut hosts = Vec::new(); - + // Push JACK first if available if available_hosts.contains(&cpal::HostId::Jack) { hosts.push(cpal::host_from_id(cpal::HostId::Jack).unwrap()); } - + // Then default host hosts.push(cpal::default_host()); for host in hosts { - if let Ok(devices) = host.input_devices() { + if let Ok(devices) = host.input_devices() { for device in devices { if let Ok(name) = device.name() { // Filter out common noise/unusable devices - if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") { + if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") + { continue; } - + // Clean up ALSA names // Example: "sysdefault:CARD=PCH" -> "PCH (sysdefault)" // Example: "front:CARD=Microphone,DEV=0" -> "Microphone (front)" let clean_name = if let Some(start) = name.find("CARD=") { let rest = &name[start + 5..]; let card_name = rest.split(',').next().unwrap_or(rest); - + let prefix = name.split(':').next().unwrap_or("Unknown"); format!("{} ({})", card_name, prefix) } else if name.contains("HDA Intel PCH") { @@ -128,13 +153,13 @@ impl MediaState { } else { name }; - + names.push(clean_name); } } - } + } } - + // Dedup and sort names.sort(); names.dedup(); @@ -143,24 +168,25 @@ impl MediaState { pub fn get_output_devices(&self) -> Vec { let mut names = Vec::new(); - + // Prioritize JACK if available let available_hosts = cpal::available_hosts(); let mut hosts = Vec::new(); - + if available_hosts.contains(&cpal::HostId::Jack) { hosts.push(cpal::host_from_id(cpal::HostId::Jack).unwrap()); } hosts.push(cpal::default_host()); for host in hosts { - if let Ok(devices) = host.output_devices() { + if let Ok(devices) = host.output_devices() { for device in devices { if let Ok(name) = device.name() { - if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") { + if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") + { continue; } - + let clean_name = if let Some(start) = name.find("CARD=") { let rest = &name[start + 5..]; let card_name = rest.split(',').next().unwrap_or(rest); @@ -169,13 +195,13 @@ impl MediaState { } else { name }; - + names.push(clean_name); } } - } + } } - + names.sort(); names.dedup(); names @@ -191,6 +217,12 @@ impl MediaState { } } + pub fn set_mic_volume(&self, volume: f32) { + if let Some(voice) = &self.voice { + voice.set_input_volume(volume); + } + } + pub fn get_volume(&self) -> f32 { if let Some(voice) = &self.voice { voice.get_volume() @@ -199,6 +231,12 @@ impl MediaState { } } + pub fn get_mic_volume(&self) -> f32 { + // VoiceChat doesn't strictly expose get_input_volume yet but we know it + // Let's assume we track it or just return initial if not active + self.initial_mic_volume // TODO: Fetch from VoiceChat if active? VoiceChat stores it in Atomic. + } + pub fn is_denoise_enabled(&self) -> bool { if let Some(voice) = &self.voice { voice.is_denoise_enabled() @@ -258,6 +296,7 @@ impl MediaState { self.input_device.clone(), self.output_device.clone(), // Added output device self.initial_master_volume, + self.initial_mic_volume, // Pass initial mic volume self.initial_noise_suppression, ) { Ok(vc) => { @@ -281,7 +320,7 @@ impl MediaState { } else { // Start let peers = net.peers.lock().await; - + // Use Native Capture (FFmpeg) match VideoCapture::start_native( MediaKind::Screen, @@ -317,7 +356,10 @@ impl MediaState { mut recv: iroh::endpoint::RecvStream, ) { let broadcast_tx = self.broadcast_tx.clone(); - + let video_frame_tx = self.video_frame_tx.clone(); + let active_sessions = self.active_video_sessions.clone(); + let media_event_tx = self.media_event_tx.clone(); + // Spawn a task to determine stream type and handle it let handle = tokio::spawn(async move { // Read first message to determine type. @@ -325,16 +367,36 @@ impl MediaState { Ok(msg) => match msg { MediaStreamMessage::AudioStart { .. } => { // DEPRECATED in Native Datagram mode - tracing::warn!("Received Audio stream from {} (unexpected in datagram mode)", from); - // We could support stream fallback, but for now we ignore or log. - // Or we can close it. + tracing::warn!( + "Received Audio stream from {} (unexpected in datagram mode)", + from + ); } MediaStreamMessage::VideoStart { .. } => { tracing::info!("Accepted Video stream from {:?}", from); - if let Err(e) = - VideoCapture::handle_incoming_video_native(from, msg, recv, broadcast_tx) - .await - { + active_sessions.insert(from); + + // Notify start + if let Some(tx) = &media_event_tx { + let _ = tx.send(InternalMediaEvent::VideoStart(from)).await; + } + + let result = VideoCapture::handle_incoming_video_native( + from, + msg, + recv, + broadcast_tx, + video_frame_tx, + ) + .await; + + active_sessions.remove(&from); + // Notify stop + if let Some(tx) = &media_event_tx { + let _ = tx.send(InternalMediaEvent::VideoStop(from)).await; + } + + if let Err(e) = result { tracing::error!("Video native playback error: {}", e); } } @@ -362,15 +424,18 @@ impl MediaState { /// Handle an incoming datagram (unreliable audio/video). pub fn handle_incoming_datagram(&mut self, from: EndpointId, data: bytes::Bytes) { - if data.is_empty() { return; } - + if data.is_empty() { + return; + } + // Check first byte for type match data[0] { - 1 => { // Audio - if let Some(voice) = &mut self.voice { - voice.handle_datagram(from, data); - } - }, + 1 => { + // Audio + if let Some(voice) = &mut self.voice { + voice.handle_datagram(from, data); + } + } // 2 => Video? _ => { // tracing::trace!("Unknown datagram type: {}", data[0]); @@ -410,9 +475,3 @@ impl MediaState { } } } - -impl Drop for MediaState { - fn drop(&mut self) { - self.shutdown(); - } -} diff --git a/src/media/playback.rs b/src/media/playback.rs new file mode 100644 index 0000000..5bb7b59 --- /dev/null +++ b/src/media/playback.rs @@ -0,0 +1,98 @@ +use anyhow::Result; +use gstreamer::prelude::*; +use gstreamer::{ElementFactory, Pipeline, State}; +use gstreamer_app::{AppSink, AppSrc}; +use std::sync::{Arc, Mutex}; + +pub struct VideoPlayer { + pipeline: Pipeline, + appsrc: AppSrc, + appsink: AppSink, +} + +impl VideoPlayer { + pub fn new(on_frame: F) -> Result + where + F: Fn(Vec, u32, u32) + Send + Sync + 'static, + { + gstreamer::init()?; + + // Pipeline: appsrc -> decodebin -> videoconvert -> appsink + // We use decodebin to handle both H.264 and HEVC automatically + // output format RGBA for Iced + let pipeline_str = "appsrc name=src is-live=true format=time do-timestamp=true ! \ + parsebin ! \ + decodebin ! \ + videoconvert ! \ + video/x-raw,format=RGBA ! \ + appsink name=sink drop=true max-buffers=1 sync=false"; + + let pipeline = gstreamer::parse::launch(pipeline_str)? + .downcast::() + .map_err(|_| anyhow::anyhow!("Expected pipeline"))?; + + let appsrc = pipeline + .by_name("src") + .ok_or_else(|| anyhow::anyhow!("Missing src"))? + .downcast::() + .map_err(|_| anyhow::anyhow!("src is not appsrc"))?; + + let appsink = pipeline + .by_name("sink") + .ok_or_else(|| anyhow::anyhow!("Missing sink"))? + .downcast::() + .map_err(|_| anyhow::anyhow!("sink is not appsink"))?; + + // Set up callback + let callbacks = gstreamer_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gstreamer::FlowError::Eos)?; + let buffer = sample.buffer().ok_or(gstreamer::FlowError::Error)?; + + // Get caps to know width/height + let caps = sample.caps().ok_or(gstreamer::FlowError::Error)?; + let structure = caps.structure(0).ok_or(gstreamer::FlowError::Error)?; + let width = structure + .get::("width") + .map_err(|_| gstreamer::FlowError::Error)? as u32; + let height = structure + .get::("height") + .map_err(|_| gstreamer::FlowError::Error)? as u32; + + let map = buffer + .map_readable() + .map_err(|_| gstreamer::FlowError::Error)?; + + on_frame(map.to_vec(), width, height); + + Ok(gstreamer::FlowSuccess::Ok) + }) + .build(); + + appsink.set_callbacks(callbacks); + + Ok(Self { + pipeline, + appsrc, + appsink, + }) + } + + pub fn start(&self) -> Result<()> { + self.pipeline.set_state(State::Playing)?; + Ok(()) + } + + pub fn stop(&self) -> Result<()> { + let _ = self.pipeline.set_state(State::Null); + Ok(()) + } + + pub fn push_data(&self, data: &[u8]) -> Result<()> { + let buffer = gstreamer::Buffer::from_slice(data.to_vec()); + self.appsrc.push_buffer(buffer)?; + Ok(()) + } +} + +// Remove Drop impl to prevent panic on shutdown. Rely on explicit stop() or process exit. diff --git a/src/media/voice.rs b/src/media/voice.rs index ac02f61..471efaf 100644 --- a/src/media/voice.rs +++ b/src/media/voice.rs @@ -6,11 +6,10 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; -use anyhow::{Result, anyhow}; -use dashmap::DashMap; -use nnnoiseless::DenoiseState; +use crate::media::WebMediaEvent; +use anyhow::{anyhow, Result}; use audiopus::{ coder::Decoder as OpusDecoder, coder::Encoder as OpusEncoder, Application, Bitrate, Channels, SampleRate, @@ -18,9 +17,10 @@ use audiopus::{ use bytes::Bytes; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; use crossbeam_channel::{unbounded, Receiver, Sender}; +use dashmap::DashMap; use iroh::EndpointId; +use nnnoiseless::DenoiseState; use ringbuf::{traits::*, HeapRb}; -use crate::media::WebMediaEvent; const PACKET_TYPE_AUDIO: u8 = 1; const FRAME_SIZE_SAMPLES: usize = 960; // 20ms at 48kHz @@ -51,46 +51,53 @@ impl std::ops::DerefMut for SyncAudioProducer { type AudioProducer = SyncAudioProducer; type AudioConsumer = ringbuf::HeapCons; - - /// Main voice chat coordination. pub struct VoiceChat { running: Arc, tasks: Vec>, - + // Capture and Playback threads capture_thread: Option>, playback_thread: Option>, - + // Per-peer state: Decoder + Jitter Buffer Producer peer_audio_sinks: HashMap, - + // Channel to notify playback thread of new peers new_peer_tx: Sender<(EndpointId, AudioConsumer)>, - + // Audio processing controls pub denoise_enabled: Arc, pub output_volume: Arc, // stored as f32 bits + pub input_volume: Arc, // stored as f32 bits pub peer_levels: Arc>, } +// impl Drop for VoiceChat { +// fn drop(&mut self) { +// self.stop(); +// } +// } + impl VoiceChat { /// Start voice chat session (Native Version with CPAL + QUIC Datagrams). pub fn start_native( net: crate::net::NetworkManager, peers: Vec, mic_tx: tokio::sync::broadcast::Sender>, - _mic_rx: tokio::sync::broadcast::Receiver>, + _mic_rx: tokio::sync::broadcast::Receiver>, _broadcast_tx: tokio::sync::broadcast::Sender, mic_bitrate: Arc, input_device_name: Option, output_device_name: Option, initial_volume: f32, + initial_mic_volume: f32, initial_denoise: bool, ) -> Result { let running = Arc::new(AtomicBool::new(true)); let denoise_enabled = Arc::new(AtomicBool::new(initial_denoise)); let output_volume = Arc::new(AtomicU32::new(initial_volume.to_bits())); + let input_volume = Arc::new(AtomicU32::new(initial_mic_volume.to_bits())); let peer_levels = Arc::new(DashMap::new()); tracing::info!("Starting Native Voice Chat..."); @@ -101,9 +108,15 @@ impl VoiceChat { let playback_device_name = output_device_name.clone(); let playback_volume = output_volume.clone(); let playback_levels = peer_levels.clone(); - + let playback_thread = thread::spawn(move || { - run_playback_loop(playback_running, new_peer_rx, playback_device_name, playback_volume, playback_levels); + run_playback_loop( + playback_running, + new_peer_rx, + playback_device_name, + playback_volume, + playback_levels, + ); }); // 2. Setup Capture Thread (CPAL Input) @@ -111,9 +124,16 @@ impl VoiceChat { let mic_tx_capture = mic_tx.clone(); let capture_device_name = input_device_name.clone(); let capture_denoise = denoise_enabled.clone(); - + let capture_volume = input_volume.clone(); + let capture_thread = thread::spawn(move || { - run_capture_loop(capture_running, mic_tx_capture, capture_device_name, capture_denoise); + run_capture_loop( + capture_running, + mic_tx_capture, + capture_device_name, + capture_denoise, + capture_volume, + ); }); // 3. Setup Network Sender Task (Opus -> Datagrams) @@ -130,7 +150,8 @@ impl VoiceChat { mic_rx_sender, sender_running, mic_bitrate_clone, - ).await; + ) + .await; }); tasks.push(sender_task); @@ -143,12 +164,18 @@ impl VoiceChat { new_peer_tx, denoise_enabled, output_volume, + input_volume, peer_levels, }) } - + pub fn set_volume(&self, volume: f32) { - self.output_volume.store(volume.to_bits(), Ordering::Relaxed); + self.output_volume + .store(volume.to_bits(), Ordering::Relaxed); + } + + pub fn set_input_volume(&self, volume: f32) { + self.input_volume.store(volume.to_bits(), Ordering::Relaxed); } pub fn get_volume(&self) -> f32 { @@ -166,9 +193,12 @@ impl VoiceChat { } pub fn get_peer_levels(&self) -> HashMap { - self.peer_levels.iter().map(|entry| (*entry.key(), *entry.value())).collect() + self.peer_levels + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect() } - + // Kept for compatibility but unused in Native mode pub fn start_web( _net: crate::net::NetworkManager, @@ -177,7 +207,7 @@ impl VoiceChat { _broadcast_tx: tokio::sync::broadcast::Sender, _mic_bitrate: Arc, ) -> Result { - Err(anyhow!("Web voice not supported in this native build")) + Err(anyhow!("Web voice not supported in this native build")) } /// Stop voice chat. @@ -187,7 +217,8 @@ impl VoiceChat { task.abort(); } self.tasks.clear(); - + + // Spawn a thread to join audio threads to avoid blocking async context if they are stuck if let Some(t) = self.capture_thread.take() { t.thread().unpark(); } @@ -213,7 +244,7 @@ impl VoiceChat { // Get or create decoder/producer for this peer let (decoder, producer) = self.peer_audio_sinks.entry(from).or_insert_with(|| { tracing::info!("New voice peer detected: {}", from); - + // Create Jitter Buffer (RingBuf) // 48kHz * 1s buffer let rb = HeapRb::::new(48000); @@ -226,7 +257,7 @@ impl VoiceChat { let decoder = OpusDecoder::new(SampleRate::Hz48000, Channels::Mono) .expect("Failed to create Opus decoder"); - + (SendDecoder(decoder), SyncAudioProducer(prod)) }); @@ -254,18 +285,26 @@ fn run_capture_loop( mic_tx: tokio::sync::broadcast::Sender>, device_name: Option, denoise_enabled: Arc, + input_volume: Arc, ) { let host = cpal::default_host(); - + // Find device let device = if let Some(ref name) = device_name { - host.input_devices().ok().and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false))) + tracing::info!("Requesting input device: '{}'", name); + host.input_devices() + .ok() + .and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false))) } else { + tracing::info!("Requesting default input device"); host.default_input_device() }; let device = match device { - Some(d) => d, + Some(d) => { + tracing::info!("Found input device: {:?}", d.name()); + d + } None => { tracing::error!("No input device found"); return; @@ -281,10 +320,10 @@ fn run_capture_loop( return; } }; - + // We try to stick to default but standardise to 1 channel if possible. let stream_config: cpal::StreamConfig = config.clone().into(); - + tracing::info!("Input config: {:?}", stream_config); // Initialize RNNoise @@ -293,6 +332,9 @@ fn run_capture_loop( let mut processing_buffer: Vec = Vec::with_capacity(480 * 2); let mut out_buf = [0.0f32; DenoiseState::FRAME_SIZE]; + let mut last_log = Instant::now(); + let mut packet_count = 0; + let err_fn = |err| tracing::error!("Input stream error: {}", err); let stream = match config.sample_format() { @@ -302,21 +344,36 @@ fn run_capture_loop( &stream_config, move |data: &[f32], _: &_| { if !running_clone.load(Ordering::Relaxed) { return; } - + + packet_count += 1; + if last_log.elapsed() >= Duration::from_secs(1) { + tracing::info!("Microphone input active: {} callbacks/sec, {} samples in last callback", packet_count, data.len()); + packet_count = 0; + last_log = Instant::now(); + } + // Convert to Mono let channels = stream_config.channels as usize; - let mono_samples: Vec = if channels == 1 { + let mut mono_samples: Vec = if channels == 1 { data.to_vec() } else { data.chunks(channels).map(|chunk| chunk.iter().sum::() / channels as f32).collect() }; - + + // Apply Input Gain + let gain = f32::from_bits(input_volume.load(Ordering::Relaxed)); + if gain != 1.0 { + for sample in &mut mono_samples { + *sample *= gain; + } + } + if !mono_samples.is_empty() { let use_denoise = denoise_enabled.load(Ordering::Relaxed); - + if use_denoise { processing_buffer.extend_from_slice(&mono_samples); - + while processing_buffer.len() >= DenoiseState::FRAME_SIZE { let chunk: Vec = processing_buffer.drain(0..DenoiseState::FRAME_SIZE).collect(); denoise_state.process_frame(&mut out_buf, &chunk); @@ -331,10 +388,10 @@ fn run_capture_loop( err_fn, None ) - }, + } _ => { - tracing::error!("Input device does not support F32 samples"); - return; + tracing::error!("Input device does not support F32 samples"); + return; } }; @@ -343,7 +400,7 @@ fn run_capture_loop( tracing::error!("Failed to play input stream: {}", e); } tracing::info!("Voice started (Capture)"); - + // Keep thread alive while running.load(Ordering::Relaxed) { thread::sleep(Duration::from_millis(100)); @@ -361,15 +418,22 @@ fn run_playback_loop( peer_levels: Arc>, ) { let host = cpal::default_host(); - + let device = if let Some(ref name) = device_name { - host.output_devices().ok().and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false))) + tracing::info!("Requesting output device: '{}'", name); + host.output_devices() + .ok() + .and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false))) } else { + tracing::info!("Requesting default output device"); host.default_output_device() }; let device = match device { - Some(d) => d, + Some(d) => { + tracing::info!("Found output device: {:?}", d.name()); + d + } None => { tracing::error!("No output device found"); return; @@ -391,14 +455,26 @@ fn run_playback_loop( let err_fn = |err| tracing::error!("Output stream error: {}", err); + let mut last_log = Instant::now(); + let mut packet_count = 0; + let stream = match config.sample_format() { cpal::SampleFormat::F32 => { let running_clone = running.clone(); device.build_output_stream( &stream_config, move |data: &mut [f32], _: &_| { - if !running_clone.load(Ordering::Relaxed) { return; } - + if !running_clone.load(Ordering::Relaxed) { + return; + } + + packet_count += 1; + if last_log.elapsed() >= Duration::from_secs(1) { + tracing::info!("Speaker output active: {} callbacks/sec", packet_count); + packet_count = 0; + last_log = Instant::now(); + } + let master_vol = f32::from_bits(output_volume.load(Ordering::Relaxed)); // Check for new peers non-blocking @@ -409,10 +485,10 @@ fn run_playback_loop( // Mix let channels = stream_config.channels as usize; - + // We assume we are filling interleaved buffer. // Our ringbufs are Mono. We duplicate mono to all channels. - + // Pre-allocate level accumulators for this frame // We'll calculate RMS over the whole buffer size for UI visualization let mut peer_sums: HashMap = HashMap::new(); @@ -421,43 +497,43 @@ fn run_playback_loop( // Iterate output buffer frame by frame (all channels per sample time) for frame in data.chunks_mut(channels) { let mut sum: f32 = 0.0; - + // Sum up all peers for (id, c) in consumers.iter_mut() { if let Some(sample) = c.try_pop() { sum += sample; - + // Accumulate squared sample for RMS *peer_sums.entry(*id).or_default() += sample * sample; *peer_counts.entry(*id).or_default() += 1; } } - + // Apply master volume sum *= master_vol; - + // Soft clip let mixed = sum.clamp(-1.0, 1.0); - + // Assign to all channels for sample in frame.iter_mut() { *sample = mixed; } } - + // Update peer levels in shared map for (id, sq_sum) in peer_sums { let count = peer_counts.get(&id).unwrap_or(&1); let rms = (sq_sum / *count as f32).sqrt(); - + // Smooth decay could be implemented here, but for now just raw RMS peer_levels.insert(id, rms); } }, err_fn, - None + None, ) - }, + } _ => { tracing::error!("Output device does not support F32 samples"); return; @@ -468,7 +544,7 @@ fn run_playback_loop( if let Err(e) = s.play() { tracing::error!("Failed to play output stream: {}", e); } - + while running.load(Ordering::Relaxed) { thread::sleep(Duration::from_millis(100)); } @@ -484,7 +560,9 @@ async fn run_network_sender( running: Arc, mic_bitrate: Arc, ) { - if peers.is_empty() { return; } + if peers.is_empty() { + return; + } // Initialize connections let mut connections = Vec::new(); @@ -502,9 +580,11 @@ async fn run_network_sender( let mut encoder = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip) .expect("Failed to create Opus encoder"); - + // Initial bitrate - let _ = encoder.set_bitrate(Bitrate::BitsPerSecond(mic_bitrate.load(Ordering::Relaxed) as i32)); + let _ = encoder.set_bitrate(Bitrate::BitsPerSecond( + mic_bitrate.load(Ordering::Relaxed) as i32 + )); let mut pcm_buffer: Vec = Vec::with_capacity(FRAME_SIZE_SAMPLES * 2); let mut opus_buffer = vec![0u8; 1500]; @@ -522,18 +602,18 @@ async fn run_network_sender( while pcm_buffer.len() >= FRAME_SIZE_SAMPLES { let chunk: Vec = pcm_buffer.drain(0..FRAME_SIZE_SAMPLES).collect(); - + match encoder.encode_float(&chunk, &mut opus_buffer) { Ok(len) => { let opus_packet = &opus_buffer[..len]; - + // Construct Datagram: [TYPE=1][OPUS] let mut datagram = Vec::with_capacity(1 + len); datagram.push(PACKET_TYPE_AUDIO); datagram.extend_from_slice(opus_packet); - + let bytes = Bytes::from(datagram); - + // Send to all peers for conn in &mut connections { if let Err(e) = conn.send_datagram(bytes.clone()) { diff --git a/src/net/mod.rs b/src/net/mod.rs index cc5fbcb..9f67c3a 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -73,6 +73,8 @@ pub struct PeerInfo { pub is_self: bool, #[serde(skip)] pub audio_level: f32, + #[serde(skip)] + pub is_streaming_video: bool, } /// Manages the iroh networking stack. @@ -207,6 +209,7 @@ impl NetworkManager { capabilities: None, is_self: false, audio_level: 0.0, + is_streaming_video: false, }); } let _ = event_tx.send(NetEvent::PeerUp(peer_id)).await; diff --git a/src/web/mod.rs b/src/web/mod.rs index 7388fd0..80ad3ac 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -64,9 +64,11 @@ pub async fn start_web_server( axum::serve(listener, app).await.unwrap(); } - // --- AUDIO --- -async fn ws_audio_handler(ws: WebSocketUpgrade, State(state): State) -> impl IntoResponse { +async fn ws_audio_handler( + ws: WebSocketUpgrade, + State(state): State, +) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_audio_socket(socket, state)) } @@ -77,7 +79,11 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) { // Outgoing (Server -> Browser) tokio::spawn(async move { while let Ok(event) = rx.recv().await { - if let WebMediaEvent::Audio { peer_id, data: samples } = event { + if let WebMediaEvent::Audio { + peer_id, + data: samples, + } = event + { // Protocol: [IDLen] [ID] [f32...] let id_bytes = peer_id.as_bytes(); let id_len = id_bytes.len() as u8; @@ -87,7 +93,11 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) { for s in samples { payload.extend_from_slice(&s.to_ne_bytes()); } - if sender.send(Message::Binary(Bytes::from(payload))).await.is_err() { + if sender + .send(Message::Binary(Bytes::from(payload))) + .await + .is_err() + { break; } } @@ -97,7 +107,7 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) { // Incoming (Browser -> Server) while let Some(msg) = receiver.next().await { if let Ok(Message::Binary(data)) = msg { - // Protocol: [f32...] + // Protocol: [f32...] // (We dropped the header byte 3) if data.len() % 4 == 0 { let samples: Vec = data @@ -111,7 +121,10 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) { } // --- SCREEN --- -async fn ws_screen_handler(ws: WebSocketUpgrade, State(state): State) -> impl IntoResponse { +async fn ws_screen_handler( + ws: WebSocketUpgrade, + State(state): State, +) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_screen_socket(socket, state)) } @@ -122,7 +135,12 @@ async fn handle_screen_socket(socket: WebSocket, state: AppState) { // Outgoing (Server -> Browser) tokio::spawn(async move { while let Ok(event) = rx.recv().await { - if let WebMediaEvent::Video { peer_id, kind, data } = event { + if let WebMediaEvent::Video { + peer_id, + kind, + data, + } = event + { if matches!(kind, MediaKind::Screen) { let id_bytes = peer_id.as_bytes(); let id_len = id_bytes.len() as u8; @@ -131,11 +149,15 @@ async fn handle_screen_socket(socket: WebSocket, state: AppState) { payload.extend_from_slice(id_bytes); payload.extend_from_slice(&data); - if sender.send(Message::Binary(Bytes::from(payload))).await.is_err() { + if sender + .send(Message::Binary(Bytes::from(payload))) + .await + .is_err() + { break; } } - } + } } }); diff --git a/tests/tests.rs b/tests/tests.rs index 60dfc9e..9e0ba7b 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -290,8 +290,9 @@ screen_resolution = "1920x1080" // ============================================================================ mod tui_tests { use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; + use p2p_chat::app_logic::AppCommand; use p2p_chat::config::{Theme, UiConfig}; - use p2p_chat::tui::{App, InputMode, TuiCommand}; + use p2p_chat::tui::{App, InputMode}; fn make_app() -> App { let theme: Theme = UiConfig::default().into(); @@ -320,7 +321,7 @@ mod tui_tests { let mut app = make_app(); assert_eq!(app.input_mode, InputMode::Editing); let cmd = app.handle_key(key(KeyCode::Esc)); - assert!(matches!(cmd, TuiCommand::None)); + assert!(matches!(cmd, AppCommand::None)); assert_eq!(app.input_mode, InputMode::Normal); } @@ -329,7 +330,7 @@ mod tui_tests { let mut app = make_app(); app.input_mode = InputMode::Normal; let cmd = app.handle_key(key(KeyCode::Char('i'))); - assert!(matches!(cmd, TuiCommand::None)); + assert!(matches!(cmd, AppCommand::None)); assert_eq!(app.input_mode, InputMode::Editing); } @@ -338,7 +339,7 @@ mod tui_tests { let mut app = make_app(); app.input_mode = InputMode::Normal; let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::None)); + assert!(matches!(cmd, AppCommand::None)); assert_eq!(app.input_mode, InputMode::Editing); } @@ -347,7 +348,7 @@ mod tui_tests { let mut app = make_app(); app.input_mode = InputMode::Normal; let cmd = app.handle_key(key(KeyCode::Char('/'))); - assert!(matches!(cmd, TuiCommand::None)); + assert!(matches!(cmd, AppCommand::None)); assert_eq!(app.input_mode, InputMode::Editing); assert_eq!(app.input, "/"); assert_eq!(app.cursor_position, 1); @@ -358,7 +359,7 @@ mod tui_tests { let mut app = make_app(); app.input_mode = InputMode::Normal; let cmd = app.handle_key(key(KeyCode::Char('q'))); - assert!(matches!(cmd, TuiCommand::Quit)); + assert!(matches!(cmd, AppCommand::Quit)); } #[test] @@ -388,7 +389,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char('h'))); app.handle_key(key(KeyCode::Char('i'))); let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::SendMessage(ref s) if s == "hi")); + assert!(matches!(cmd, AppCommand::SendMessage(ref s) if s == "hi")); assert_eq!(app.input, ""); assert_eq!(app.cursor_position, 0); } @@ -397,7 +398,7 @@ mod tui_tests { fn enter_on_empty_input_does_nothing() { let mut app = make_app(); let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::None)); + assert!(matches!(cmd, AppCommand::None)); } #[test] @@ -407,7 +408,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::Quit)); + assert!(matches!(cmd, AppCommand::Quit)); } #[test] @@ -417,7 +418,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::SystemMessage(_))); + assert!(matches!(cmd, AppCommand::SystemMessage(_))); } #[test] @@ -427,7 +428,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::SystemMessage(_))); + assert!(matches!(cmd, AppCommand::SystemMessage(_))); } #[test] @@ -437,7 +438,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::ChangeNick(ref s) if s == "alice")); + assert!(matches!(cmd, AppCommand::ChangeNick(ref s) if s == "alice")); } #[test] @@ -447,7 +448,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::SystemMessage(_))); + assert!(matches!(cmd, AppCommand::SystemMessage(_))); } #[test] @@ -457,7 +458,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::Connect(ref s) if s == "abc123")); + assert!(matches!(cmd, AppCommand::Connect(ref s) if s == "abc123")); } #[test] @@ -467,17 +468,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::ToggleVoice)); - } - - #[test] - fn camera_command() { - let mut app = make_app(); - for c in "/camera".chars() { - app.handle_key(key(KeyCode::Char(c))); - } - let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::ToggleCamera)); + assert!(matches!(cmd, AppCommand::ToggleVoice)); } #[test] @@ -487,7 +478,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::ToggleScreen)); + assert!(matches!(cmd, AppCommand::ToggleScreen)); } #[test] @@ -497,7 +488,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::Leave)); + assert!(matches!(cmd, AppCommand::Leave)); } #[test] @@ -507,7 +498,7 @@ mod tui_tests { app.handle_key(key(KeyCode::Char(c))); } let cmd = app.handle_key(key(KeyCode::Enter)); - assert!(matches!(cmd, TuiCommand::SystemMessage(_))); + assert!(matches!(cmd, AppCommand::SystemMessage(_))); } #[test] @@ -518,7 +509,7 @@ mod tui_tests { } let cmd = app.handle_key(key(KeyCode::Enter)); match cmd { - TuiCommand::SendFile(path) => { + AppCommand::SendFile(path) => { assert_eq!(path.to_str().unwrap(), "/tmp/test.txt"); } _ => panic!("Expected SendFile, got {:?}", cmd),