This commit is contained in:
mixa
2026-02-19 07:49:31 +03:00
parent 6f13bb03aa
commit 78e1efeaea
17 changed files with 2377 additions and 858 deletions

83
AGENTS.md Normal file
View File

@@ -0,0 +1,83 @@
# P2P Chat Codebase Guide for Agents
This document outlines the development workflow, code style, and architectural patterns for the `p2p-chat` repository.
## 1. Build, Test, and Run Commands
### Basic Commands
* **Build**: `cargo build`
* **Run TUI (default)**: `cargo run`
* **Run GUI**: `cargo run -- --gui`
* **Check**: `cargo check`
* **Format**: `cargo fmt`
* **Lint**: `cargo clippy`
### Testing
* **Run all tests**: `cargo test`
* **Run a specific test**: `cargo test test_name`
* **Run tests with output**: `cargo test -- --nocapture`
### Debugging
* **Logs**:
* TUI mode logs to `p2p-chat.log` in the working directory.
* GUI mode logs to `stdout` (INFO/DEBUG) and `stderr` (WARN/ERROR).
* **Environment Variables**:
* `RUST_LOG`: Control logging levels (e.g., `RUST_LOG=p2p_chat=debug,iroh=info`).
## 2. Code Style & Architecture
### General Rust Guidelines
* **Edition**: Rust 2021.
* **Formatting**: Strictly follow `rustfmt`.
* **Error Handling**: Use `anyhow::Result` for application-level errors and main functions. Use specific `thiserror` enums for libraries/modules when precise error handling is required.
* **Async/Await**: The project is built on the `tokio` runtime. Use `async/await` for I/O-bound tasks.
### Project Structure
* `src/main.rs`: Application entry point, runtime setup, and main event loop.
* `src/app_logic.rs`: Core business logic (`AppLogic` struct). Handles network events and application commands. It is agnostic of the UI (TUI vs. GUI).
* `src/gui.rs`: Iced-based GUI implementation following the Elm architecture (Model-View-Update).
* `src/tui/`: Ratatui-based TUI implementation.
* `src/net/`: Networking layer wrapping `iroh` and `iroh-gossip`.
* `src/media/`: Audio/Video capture and playback (GStreamer, cpal, FFmpeg).
* `src/protocol/`: Data structures and serialization (`serde`, `bincode`, `postcard`) for network messages.
### Naming Conventions
* **Variables/Functions**: `snake_case`.
* **Types/Traits**: `CamelCase`.
* **Constants**: `SCREAMING_SNAKE_CASE`.
* **Files**: `snake_case.rs`.
### Architectural Patterns
1. **State Management**:
* `AppLogic` holds the source of truth for application state (`ChatState`, `MediaState`, `NetworkManager`, etc.).
* `FrontendState` is a simplified struct derived from `AppLogic` to pass data to the UI (GUI/TUI) for rendering.
* Do not put core business logic inside `gui.rs` or `tui/`.
2. **Concurrency**:
* Use `tokio::spawn` for background tasks.
* Use `tokio::sync::mpsc` channels for communicating between the UI and the backend logic.
* Use `tokio::sync::broadcast` for one-to-many event distribution (e.g., video frames).
* Use `Arc<Mutex<...>>` or `Arc<Atomic...>` for shared state when channels are insufficient, but prefer message passing.
3. **GUI (Iced)**:
* **Messages**: Define all UI interactions in the `Message` enum in `src/gui.rs`.
* **Update**: Handle `Message`s in the `update` function.
* **View**: Keep the `view` function strictly for rendering based on `self.state`.
* **Subscriptions**: Use `subscription` to listen for external events (e.g., backend state updates, video frames).
4. **Networking**:
* Based on `iroh` QUIC.
* Events are received in the main loop and processed by `AppLogic::handle_net_event`.
### Dependencies
* **Networking**: `iroh`, `iroh-gossip`.
* **Runtime**: `tokio`.
* **UI**: `iced` (GUI), `ratatui` + `crossterm` (TUI).
* **Media**: `gstreamer` (video capture), `cpal` (audio I/O), `ffmpeg-next` (video encoding).
### Common Tasks
* **Adding a new feature**:
1. Update `protocol/mod.rs` if it involves new network messages.
2. Update `AppLogic` to handle the logic.
3. Update `FrontendState` to expose data to the UI.
4. Update `gui.rs` and `tui/` to render the new state.

330
Cargo.lock generated
View File

@@ -335,6 +335,24 @@ dependencies = [
"zbus 4.4.0",
]
[[package]]
name = "ashpd"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d43c03d9e36dd40cab48435be0b09646da362c278223ca535493877b2c1dee9"
dependencies = [
"async-fs",
"async-net",
"enumflags2",
"futures-channel",
"futures-util",
"rand 0.8.5",
"serde",
"serde_repr",
"url",
"zbus 4.4.0",
]
[[package]]
name = "async-broadcast"
version = "0.7.2"
@@ -536,6 +554,12 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atomic_refcell"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c"
[[package]]
name = "attohttpc"
version = "0.30.1"
@@ -617,7 +641,7 @@ dependencies = [
"log",
"num-rational",
"num-traits",
"pastey",
"pastey 0.1.1",
"rayon",
"thiserror 2.0.18",
"v_frame",
@@ -2986,6 +3010,16 @@ dependencies = [
"polyval",
]
[[package]]
name = "gif"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae047235e33e2829703574b54fdec96bfbad892062d97fed2f76022287de61b"
dependencies = [
"color_quant",
"weezl",
]
[[package]]
name = "gif"
version = "0.14.1"
@@ -2996,6 +3030,19 @@ dependencies = [
"weezl",
]
[[package]]
name = "gio-sys"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0071fe88dba8e40086c8ff9bbb62622999f49628344b1d1bf490a48a29d80f22"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"windows-sys 0.61.2",
]
[[package]]
name = "gl"
version = "0.14.0"
@@ -3022,6 +3069,50 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "151665d9be52f9bb40fc7966565d39666f2d1e69233571b71b87791c7e0528b3"
[[package]]
name = "glib"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16de123c2e6c90ce3b573b7330de19be649080ec612033d397d72da265f1bd8b"
dependencies = [
"bitflags 2.10.0",
"futures-channel",
"futures-core",
"futures-executor",
"futures-task",
"futures-util",
"gio-sys",
"glib-macros",
"glib-sys",
"gobject-sys",
"libc",
"memchr",
"smallvec",
]
[[package]]
name = "glib-macros"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf59b675301228a696fe01c3073974643365080a76cc3ed5bc2cbc466ad87f17"
dependencies = [
"heck 0.5.0",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "glib-sys"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d95e1a3a19ae464a7286e14af9a90683c64d70c02532d88d87ce95056af3e6c"
dependencies = [
"libc",
"system-deps",
]
[[package]]
name = "glob"
version = "0.3.3"
@@ -3061,6 +3152,17 @@ dependencies = [
"gl_generator",
]
[[package]]
name = "gobject-sys"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dca35da0d19a18f4575f3cb99fe1c9e029a2941af5662f326f738a21edaf294"
dependencies = [
"glib-sys",
"libc",
"system-deps",
]
[[package]]
name = "gpu-alloc"
version = "0.6.0"
@@ -3113,6 +3215,129 @@ dependencies = [
"bitflags 2.10.0",
]
[[package]]
name = "gstreamer"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bed73742c5d54cb48533be608b67d89f96e1ebbba280be7823f1ef995e3a9d7"
dependencies = [
"cfg-if",
"futures-channel",
"futures-core",
"futures-util",
"glib",
"gstreamer-sys",
"itertools 0.14.0",
"kstring",
"libc",
"muldiv",
"num-integer",
"num-rational",
"option-operations",
"pastey 0.2.1",
"pin-project-lite",
"smallvec",
"thiserror 2.0.18",
]
[[package]]
name = "gstreamer-app"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "895753fb0f976693f321e6b9d68f746ef9095f1a5b8277c11d85d807a949fbfc"
dependencies = [
"futures-core",
"futures-sink",
"glib",
"gstreamer",
"gstreamer-app-sys",
"gstreamer-base",
"libc",
]
[[package]]
name = "gstreamer-app-sys"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7719cee28afda1a48ab1ee93769628bd0653d3c5be1923bce9a8a4550fcc980"
dependencies = [
"glib-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-base"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd15c7e37d306573766834a5cbdd8ee711265f217b060f40a9a8eda45298488"
dependencies = [
"atomic_refcell",
"cfg-if",
"glib",
"gstreamer",
"gstreamer-base-sys",
"libc",
]
[[package]]
name = "gstreamer-base-sys"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27a2eda2c61e13c11883bf19b290d07ea6b53d04fd8bfeb7af64b6006c6c9ee6"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-sys"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d88630697e757c319e7bcec7b13919ba80492532dd3238481c1c4eee05d4904"
dependencies = [
"cfg-if",
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
]
[[package]]
name = "gstreamer-video"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33987f6a6a99750a07b0341d6288bac89b9b301be4672a209935203d4608d547"
dependencies = [
"cfg-if",
"futures-channel",
"glib",
"gstreamer",
"gstreamer-base",
"gstreamer-video-sys",
"libc",
"thiserror 2.0.18",
]
[[package]]
name = "gstreamer-video-sys"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a00c28faad96cd40a7b7592433051199691b131b08f622ed5d51c54e049792d3"
dependencies = [
"glib-sys",
"gobject-sys",
"gstreamer-base-sys",
"gstreamer-sys",
"libc",
"system-deps",
]
[[package]]
name = "guillotiere"
version = "0.6.2"
@@ -3585,6 +3810,7 @@ dependencies = [
"iced_renderer",
"iced_widget",
"iced_winit",
"image 0.24.9",
"thiserror 1.0.69",
]
@@ -3618,6 +3844,7 @@ dependencies = [
"iced_core",
"log",
"rustc-hash 2.1.1",
"tokio",
"wasm-bindgen-futures",
"wasm-timer",
]
@@ -3647,6 +3874,8 @@ dependencies = [
"half",
"iced_core",
"iced_futures",
"image 0.24.9",
"kamadak-exif",
"log",
"once_cell",
"raw-window-handle",
@@ -3887,6 +4116,24 @@ dependencies = [
"xmltree",
]
[[package]]
name = "image"
version = "0.24.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5690139d2f55868e080017335e4b94cb7414274c74f1669c84fb5feba2c9f69d"
dependencies = [
"bytemuck",
"byteorder",
"color_quant",
"exr",
"gif 0.13.3",
"jpeg-decoder",
"num-traits",
"png 0.17.16",
"qoi",
"tiff 0.9.1",
]
[[package]]
name = "image"
version = "0.25.9"
@@ -3897,7 +4144,7 @@ dependencies = [
"byteorder-lite",
"color_quant",
"exr",
"gif",
"gif 0.14.1",
"image-webp",
"moxcms",
"num-traits",
@@ -3906,7 +4153,7 @@ dependencies = [
"ravif",
"rayon",
"rgb",
"tiff",
"tiff 0.10.3",
"zune-core 0.5.1",
"zune-jpeg 0.5.12",
]
@@ -4379,6 +4626,15 @@ dependencies = [
"libc",
]
[[package]]
name = "jpeg-decoder"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00810f1d8b74be64b13dbf3db89ac67740615d6c891f0e7b6179326533011a07"
dependencies = [
"rayon",
]
[[package]]
name = "js-sys"
version = "0.3.85"
@@ -4389,6 +4645,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kamadak-exif"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef4fc70d0ab7e5b6bafa30216a6b48705ea964cdfc29c050f2412295eba58077"
dependencies = [
"mutate_once",
]
[[package]]
name = "kasuari"
version = "0.4.11"
@@ -4417,6 +4682,15 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc"
[[package]]
name = "kstring"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558bf9508a558512042d3095138b1f7b8fe90c5467d94f9f1da28b3731c5dbd1"
dependencies = [
"static_assertions",
]
[[package]]
name = "kurbo"
version = "0.10.4"
@@ -4535,7 +4809,7 @@ dependencies = [
"drm",
"gbm",
"gl",
"image",
"image 0.25.9",
"khronos-egl",
"memmap2",
"rustix 1.1.3",
@@ -4834,6 +5108,18 @@ dependencies = [
"pxfm",
]
[[package]]
name = "muldiv"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0"
[[package]]
name = "mutate_once"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d2233c9842d08cfe13f9eac96e207ca6a2ea10b80259ebe8ad0268be27d2af"
[[package]]
name = "n0-error"
version = "0.1.3"
@@ -5870,6 +6156,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "option-operations"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aca39cf52b03268400c16eeb9b56382ea3c3353409309b63f5c8f0b1faf42754"
dependencies = [
"pastey 0.2.1",
]
[[package]]
name = "orbclient"
version = "0.3.50"
@@ -5938,6 +6233,7 @@ name = "p2p-chat"
version = "0.1.0"
dependencies = [
"anyhow",
"ashpd 0.9.2",
"audiopus 0.2.0",
"axum",
"bincode",
@@ -5950,10 +6246,13 @@ dependencies = [
"dashmap",
"directories",
"futures",
"gstreamer",
"gstreamer-app",
"gstreamer-video",
"hex",
"iced",
"iced_futures",
"image",
"image 0.25.9",
"iroh",
"iroh-gossip",
"mime_guess",
@@ -6079,6 +6378,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35fb2e5f958ec131621fdd531e9fc186ed768cbe395337403ae56c17a74c68ec"
[[package]]
name = "pastey"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec"
[[package]]
name = "patricia_tree"
version = "0.8.0"
@@ -7126,7 +7431,7 @@ version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25a73a7337fc24366edfca76ec521f51877b114e42dab584008209cca6719251"
dependencies = [
"ashpd",
"ashpd 0.8.1",
"block",
"dispatch",
"js-sys",
@@ -8594,6 +8899,17 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "tiff"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e"
dependencies = [
"flate2",
"jpeg-decoder",
"weezl",
]
[[package]]
name = "tiff"
version = "0.10.3"
@@ -10746,7 +11062,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b65b5b36b7abe56a64f6dd46e55e26ca1ed5937a84f57fc86e8f2dd37b136f1"
dependencies = [
"dispatch2",
"image",
"image 0.25.9",
"lazy_static",
"libwayshot-xcap",
"log",

View File

@@ -42,7 +42,7 @@ directories = "5.0"
songbird = { version = "0.4", features = ["builtin-queue"] }
audiopus = "0.2"
rfd = "0.14"
iced = "0.13"
iced = { version = "0.13", features = ["image", "wgpu", "tokio"] }
iced_futures = "0.13"
crossbeam-channel = "0.5"
@@ -55,10 +55,14 @@ mime_guess = "2.0.5"
hex = "0.4.3"
cpal = { version = "0.17.1", features = ["jack"] }
xcap = "0.8.2"
ashpd = "0.9"
image = "0.25.9"
ringbuf = "0.4.8"
nnnoiseless = "0.5"
dashmap = "5"
gstreamer = "0.24.4"
gstreamer-app = "0.24.4"
gstreamer-video = "0.24.4"
[profile.dev]
opt-level = 0

View File

@@ -1,6 +1,6 @@
use crate::chat::{ChatEntry, ChatState};
use crate::config::AppConfig;
use crate::file_transfer::FileTransferManager;
use crate::file_transfer::{FileTransferManager, TransferInfo};
use crate::media::MediaState;
use crate::net::{NetEvent, NetworkManager, PeerInfo};
use crate::protocol::{self, GossipMessage};
@@ -11,6 +11,7 @@ use std::path::PathBuf;
pub struct FrontendState {
pub chat_history: Vec<ChatEntry>,
pub peers: Vec<PeerInfo>,
pub transfers: Vec<TransferInfo>,
pub our_name: String,
pub our_id: String,
pub our_id_full: String,
@@ -18,6 +19,7 @@ pub struct FrontendState {
pub input_device_name: Option<String>,
pub output_device_name: Option<String>,
pub master_volume: f32,
pub mic_volume: f32,
pub noise_suppression: bool,
}
@@ -26,6 +28,7 @@ impl Default for FrontendState {
Self {
chat_history: Vec::new(),
peers: Vec::new(),
transfers: Vec::new(),
our_name: "Unknown".to_string(),
our_id: "".to_string(),
our_id_full: "".to_string(),
@@ -33,6 +36,7 @@ impl Default for FrontendState {
input_device_name: None,
output_device_name: None,
master_volume: 1.0,
mic_volume: 1.0,
noise_suppression: true,
}
}
@@ -52,6 +56,7 @@ pub enum AppCommand {
SetInputDevice(String),
SetOutputDevice(String),
SetMasterVolume(f32),
SetMicVolume(f32),
ToggleNoiseCancel,
SetBitrate(u32),
@@ -68,6 +73,8 @@ pub struct AppLogic {
pub our_name: String,
pub our_id_short: String,
pub connected: bool,
pub media_event_rx: tokio::sync::mpsc::Receiver<crate::media::InternalMediaEvent>,
pub net_event_rx: tokio::sync::mpsc::Receiver<crate::net::NetEvent>,
}
impl AppLogic {
@@ -78,6 +85,8 @@ impl AppLogic {
net: NetworkManager,
our_name: String,
our_id_short: String,
media_event_rx: tokio::sync::mpsc::Receiver<crate::media::InternalMediaEvent>,
net_event_rx: tokio::sync::mpsc::Receiver<crate::net::NetEvent>,
) -> Self {
Self {
chat,
@@ -87,6 +96,8 @@ impl AppLogic {
our_name,
our_id_short,
connected: false,
media_event_rx,
net_event_rx,
}
}
@@ -110,7 +121,8 @@ impl AppLogic {
}
AppCommand::SetInputDevice(device_name) => {
self.media.set_input_device(device_name.clone());
self.chat.add_system_message(format!("Microphone set to: {}", device_name));
self.chat
.add_system_message(format!("Microphone set to: {}", device_name));
if let Ok(mut cfg) = AppConfig::load() {
cfg.media.input_device = Some(device_name);
let _ = cfg.save();
@@ -118,7 +130,8 @@ impl AppLogic {
}
AppCommand::SetOutputDevice(device_name) => {
self.media.set_output_device(device_name.clone());
self.chat.add_system_message(format!("Output set to: {}", device_name));
self.chat
.add_system_message(format!("Output set to: {}", device_name));
if let Ok(mut cfg) = AppConfig::load() {
cfg.media.output_device = Some(device_name);
let _ = cfg.save();
@@ -131,16 +144,25 @@ impl AppLogic {
let _ = cfg.save();
}
}
AppCommand::SetMicVolume(vol) => {
self.media.set_mic_volume(vol);
if let Ok(mut cfg) = AppConfig::load() {
cfg.media.mic_volume = vol;
let _ = cfg.save();
}
}
AppCommand::ToggleNoiseCancel => {
if let Some(enabled) = self.media.toggle_denoise() {
let status = if enabled { "enabled" } else { "disabled" };
self.chat.add_system_message(format!("Noise cancellation {}", status));
self.chat
.add_system_message(format!("Noise cancellation {}", status));
if let Ok(mut cfg) = AppConfig::load() {
cfg.media.noise_suppression = enabled;
let _ = cfg.save();
}
} else {
self.chat.add_system_message("Voice chat not active".to_string());
self.chat
.add_system_message("Voice chat not active".to_string());
}
}
AppCommand::Quit => {
@@ -510,17 +532,29 @@ impl AppLogic {
let mut peers: Vec<PeerInfo> = peers_map.values().cloned().collect();
peers.sort_by(|a, b| a.id.to_string().cmp(&b.id.to_string()));
// Sync audio levels
// Sync audio levels and video status
let levels = self.media.get_peer_levels();
let video_sessions = &self.media.active_video_sessions;
for peer in &mut peers {
if let Some(level) = levels.get(&peer.id) {
peer.audio_level = *level;
}
if video_sessions.contains(&peer.id) {
peer.is_streaming_video = true;
} else {
peer.is_streaming_video = false;
}
}
// Check timeouts/cleanups
self.file_mgr.check_timeouts();
let transfers = self.file_mgr.active_transfers();
FrontendState {
chat_history: self.chat.history.clone(),
peers,
transfers,
our_name: self.our_name.clone(),
our_id: self.our_id_short.clone(),
our_id_full: self.net.our_id.to_string(),
@@ -528,6 +562,7 @@ impl AppLogic {
input_device_name: self.media.input_device.clone(),
output_device_name: self.media.output_device.clone(),
master_volume: self.media.get_volume(),
mic_volume: self.media.get_mic_volume(),
noise_suppression: self.media.is_denoise_enabled(),
}
}

View File

@@ -67,6 +67,8 @@ pub struct MediaConfig {
pub output_device: Option<String>,
#[serde(default = "default_volume")]
pub master_volume: f32,
#[serde(default = "default_volume")]
pub mic_volume: f32,
#[serde(default = "default_true")]
pub noise_suppression: bool,
}
@@ -90,6 +92,7 @@ impl Default for MediaConfig {
input_device: None,
output_device: None,
master_volume: 1.0,
mic_volume: 1.0,
noise_suppression: true,
}
}

View File

@@ -197,10 +197,6 @@ impl FileTransferManager {
// `execute_send` logic needs a timeout on `decode_framed`.
}
/// Execute the sending side of a file transfer over a QUIC bi-stream.
#[allow(dead_code)]
pub async fn execute_send(

File diff suppressed because it is too large Load Diff

View File

@@ -64,11 +64,67 @@ struct Cli {
gui: bool,
}
#[tokio::main]
async fn main() -> Result<()> {
// ... tracing init ...
// Initialize tracing to file (not stdout, since we use TUI)
let _tracing_guard = tracing_subscriber::fmt()
// Remove #[tokio::main] and use manual runtime builder
fn main() -> Result<()> {
// Setup runtime
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
// Block on async main logic
rt.block_on(async_main())
}
async fn async_main() -> Result<()> {
let cli = Cli::parse();
// Load config
let config = AppConfig::load().unwrap_or_else(|e| {
eprintln!("Warning: Failed to load config: {}", e);
AppConfig::default()
});
// Initialize tracing
if cli.gui {
// GUI Mode: Log to stdout (Info/Debug) and stderr (Warn/Error)
use tracing_subscriber::fmt::writer::MakeWriterExt;
let stdout = std::io::stdout.with_max_level(tracing::Level::INFO);
let stderr = std::io::stderr.with_min_level(tracing::Level::WARN);
// Combine them? tracing_subscriber doesn't easily support split writers in one layer without customization.
// But we can use the MakeWriter implementation pattern.
struct SplitWriter;
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SplitWriter {
type Writer = Box<dyn io::Write + Send + 'a>;
fn make_writer(&'a self) -> Self::Writer {
Box::new(io::stdout())
}
fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
if *meta.level() <= tracing::Level::WARN {
Box::new(io::stderr())
} else {
Box::new(io::stdout())
}
}
}
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("p2p_chat=debug".parse()?)
.add_directive("iroh=warn".parse()?)
.add_directive("iroh_gossip=warn".parse()?),
)
.with_writer(SplitWriter)
.with_ansi(true)
.init();
} else {
// TUI Mode: Log to file to avoid breaking UI
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("p2p_chat=debug".parse()?)
@@ -87,16 +143,7 @@ async fn main() -> Result<()> {
})
.with_ansi(false)
.init();
// Load config
let config = AppConfig::load().unwrap_or_else(|e| {
eprintln!("Warning: Failed to load config: {}", e);
AppConfig::default()
});
let cli = Cli::parse();
}
// Topic: CLI > Config > Default
let topic_str = cli
@@ -108,7 +155,7 @@ async fn main() -> Result<()> {
// ... networking init ...
// Initialize networking
let (mut net_mgr, _net_tx, mut net_rx) = NetworkManager::new(topic_bytes)
let (mut net_mgr, _net_tx, net_event_rx) = NetworkManager::new(topic_bytes)
.await
.context("Failed to start networking")?;
@@ -134,12 +181,16 @@ async fn main() -> Result<()> {
let file_mgr = FileTransferManager::new(download_path);
// Pass mic name from config if present
// Pass mic name from config if present
let (media_event_tx, media_event_rx) = tokio::sync::mpsc::channel(10);
let media = MediaState::new(
config.media.mic_bitrate,
config.media.input_device.clone(),
config.media.output_device.clone(),
config.media.master_volume,
config.media.mic_volume,
config.media.noise_suppression,
Some(media_event_tx),
);
// Initialize App with Theme
@@ -184,6 +235,7 @@ async fn main() -> Result<()> {
capabilities: None,
is_self: true,
audio_level: 0.0,
is_streaming_video: false,
},
);
}
@@ -205,8 +257,6 @@ async fn main() -> Result<()> {
);
}
// Start Web Interface
// Start Web Interface
// Start Web Interface
tokio::spawn(crate::web::start_web_server(
media.broadcast_tx.clone(),
@@ -222,6 +272,8 @@ async fn main() -> Result<()> {
net_mgr,
cli.name.clone(),
our_id_short,
media_event_rx,
net_event_rx,
);
if cli.gui {
@@ -232,55 +284,59 @@ async fn main() -> Result<()> {
// Channel for AppLogic -> GUI state updates
let (gui_state_tx, gui_state_rx) = mpsc::channel(100);
// Subscribe to video frames
let video_rx = app_logic.media.video_frame_tx.subscribe();
// Get initial state
let initial_state = app_logic.get_frontend_state().await;
// Spawn AppLogic loop
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
let loop_handle = tokio::spawn(async move {
loop {
let mut state_changed = false;
tokio::select! {
_ = interval.tick() => {
app_logic.file_mgr.check_timeouts();
Some(event) = app_logic.net_event_rx.recv() => {
app_logic.handle_net_event(event).await;
// Send state update to GUI
let state = app_logic.get_frontend_state().await;
let _ = gui_state_tx.send(state).await;
}
Some(cmd) = gui_cmd_rx.recv() => {
match app_logic.handle_command(cmd).await {
Ok(true) => { // Quit command
cmd_opt = gui_cmd_rx.recv() => {
match cmd_opt {
Some(cmd) => {
// Handle command from GUI
if let Ok(should_quit) = app_logic.handle_command(cmd).await {
// Update state even if quitting
let state = app_logic.get_frontend_state().await;
let _ = gui_state_tx.send(state).await;
if should_quit {
break;
}
Ok(false) => {
state_changed = true;
}
Err(e) => {
tracing::error!("Command error: {}", e);
}
}
}
Some(event) = net_rx.recv() => {
app_logic.handle_net_event(event).await;
state_changed = true;
}
Some(event) = gossip_event_rx.recv() => {
app_logic.handle_net_event(event).await;
state_changed = true;
}
_ = tokio::signal::ctrl_c() => {
None => {
// GUI channel closed, exit loop
break;
}
}
}
Some(_media_evt) = app_logic.media_event_rx.recv() => {
// Media state changed (video started/stopped), update GUI
let state = app_logic.get_frontend_state().await;
let _ = gui_state_tx.send(state).await;
}
}
}
if state_changed {
let new_state = app_logic.get_frontend_state().await;
if gui_state_tx.send(new_state).await.is_err() {
break;
}
}
}
// Shutdown logic
let _ = app_logic.net.shutdown().await;
// Offload entire app_logic shutdown to blocking thread to avoid async drop panics
let _ = tokio::task::spawn_blocking(move || {
// Shutdown media explicitly (stops threads)
app_logic.media.shutdown();
// app_logic is dropped here, on a blocking thread.
// This includes NetworkManager, ChatState, etc.
})
.await;
});
// Run GUI
@@ -288,9 +344,14 @@ async fn main() -> Result<()> {
initial_state,
command_sender: gui_cmd_tx,
state_receiver: gui_state_rx,
video_receiver: video_rx,
};
crate::gui::run(flags)?;
// Wait for background task to finish cleanup
let _ = loop_handle.await;
return Ok(());
}
@@ -306,7 +367,6 @@ async fn main() -> Result<()> {
&mut terminal,
&mut app,
&mut app_logic,
&mut net_rx,
&mut gossip_event_rx,
)
.await;
@@ -326,7 +386,6 @@ async fn run_event_loop(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
logic: &mut crate::app_logic::AppLogic,
net_rx: &mut mpsc::Receiver<NetEvent>,
gossip_rx: &mut mpsc::Receiver<NetEvent>,
) -> Result<()> {
let mut event_stream = EventStream::new();
@@ -386,7 +445,7 @@ async fn run_event_loop(
}
// Network events from file transfer acceptor
Some(event) = net_rx.recv() => {
Some(event) = logic.net_event_rx.recv() => {
logic.handle_net_event(event).await;
}
@@ -444,5 +503,3 @@ fn parse_topic(hex_str: &str) -> Result<[u8; 32]> {
Ok(bytes)
}
}

View File

@@ -0,0 +1,135 @@
use anyhow::Result;
use gstreamer::prelude::*;
use gstreamer::{Pipeline, State};
use gstreamer_app::{AppSink, AppSinkCallbacks};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub struct GStreamerCapture {
pipeline: Pipeline,
sink: AppSink,
}
impl GStreamerCapture {
pub fn new(pipeline_str: &str) -> Result<Self> {
// Initialize GStreamer
// We only need to init once, but calling it multiple times is safe.
// gstreamer::init() checks internally.
gstreamer::init()?;
// Create pipeline from string
let pipeline = gstreamer::parse::launch(pipeline_str)?
.downcast::<Pipeline>()
.map_err(|_| anyhow::anyhow!("Expected a pipeline"))?;
// Get the appsink
let sink = pipeline
.by_name("sink")
.ok_or_else(|| anyhow::anyhow!("Pipeline must have an appsink named 'sink'"))?
.downcast::<AppSink>()
.map_err(|_| anyhow::anyhow!("'sink' element is not an appsink"))?;
Ok(Self { pipeline, sink })
}
pub fn set_callback<F>(&self, on_sample: F)
where
F: Fn(&[u8]) + Send + Sync + 'static,
{
let count = Arc::new(AtomicUsize::new(0));
let callbacks = AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gstreamer::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gstreamer::FlowError::Error)?;
let map = buffer
.map_readable()
.map_err(|_| gstreamer::FlowError::Error)?;
let c = count.fetch_add(1, Ordering::Relaxed);
if c % 10 == 0 {
tracing::info!("GStreamer captured frame #{}", c);
}
on_sample(map.as_slice());
Ok(gstreamer::FlowSuccess::Ok)
})
.build();
self.sink.set_callbacks(callbacks);
}
pub fn start(&self) -> Result<()> {
self.pipeline.set_state(State::Playing)?;
// Spawn a bus watcher
let bus = self.pipeline.bus().expect("Pipeline has no bus");
std::thread::spawn(move || {
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
use gstreamer::MessageView;
match msg.view() {
MessageView::Error(err) => {
tracing::error!("GStreamer Error: {} ({:?})", err.error(), err.debug());
break;
}
MessageView::Warning(warn) => {
tracing::warn!("GStreamer Warning: {} ({:?})", warn.error(), warn.debug());
}
MessageView::Eos(..) => {
tracing::info!("GStreamer EndOfStream");
break;
}
_ => (),
}
}
});
Ok(())
}
pub fn stop(&self) -> Result<()> {
let _ = self.pipeline.set_state(State::Null);
Ok(())
}
}
impl Drop for GStreamerCapture {
fn drop(&mut self) {
let _ = self.stop();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[test]
fn test_appsink_pattern() {
// Simple videotestsrc pipeline for testing
// videotestsrc ! videoconvert ! video/x-raw,format=I420 ! appsink name=sink
let pipeline_str = "videotestsrc num-buffers=10 ! videoconvert ! video/x-raw,format=I420 ! appsink name=sink";
let capture = GStreamerCapture::new(pipeline_str).expect("Failed to create capture");
let frame_count = Arc::new(Mutex::new(0));
let frame_count_clone = frame_count.clone();
capture.set_callback(move |data| {
let mut count = frame_count_clone.lock().unwrap();
*count += 1;
println!("Received frame of size: {}", data.len());
});
capture.start().expect("Failed to start");
// Wait for frames
std::thread::sleep(Duration::from_secs(2));
capture.stop().expect("Failed to stop");
let count = *frame_count.lock().unwrap();
assert!(count > 0, "Should have received at least one frame");
println!("Total frames received: {}", count);
}
}

View File

@@ -1,26 +1,20 @@
//! Video capture and playback using FFmpeg and MPV.
//!
//! Captures video by spawning an `ffmpeg` process and reading its stdout.
//! Plays video by spawning `mpv` (or `vlc`) and writing to its stdin.
pub mod gstreamer;
pub mod portal;
use anyhow::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing;
use crate::media::playback::VideoPlayer;
use crate::media::WebMediaEvent;
use crate::protocol::{decode_framed, write_framed, MediaKind, MediaStreamMessage};
use portal::{select_wayland_source, AshpdKeeper};
use std::process::Stdio;
use tokio::io::{AsyncReadExt, BufReader, AsyncWriteExt, AsyncBufReadExt};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use xcap::Monitor;
use ashpd::desktop::{PersistMode, screencast::{CursorMode, Screencast, SourceType}};
struct AshpdKeeper {
_proxy: Screencast<'static>,
_session: ashpd::desktop::Session<'static, Screencast<'static>>,
}
/// Manages a video capture session (camera or screen).
pub struct VideoCapture {
@@ -52,10 +46,7 @@ impl VideoCapture {
}));
}
Ok(Self {
running,
tasks,
})
Ok(Self { running, tasks })
}
/// Start web-based camera share (relay from Web to Peers).
@@ -81,10 +72,7 @@ impl VideoCapture {
}));
}
Ok(Self {
running,
tasks,
})
Ok(Self { running, tasks })
}
/// Start native video capture via FFmpeg.
@@ -109,20 +97,57 @@ impl VideoCapture {
let kind_clone = kind.clone();
tasks.push(tokio::spawn(async move {
// Set GST_DEBUG to see errors
std::env::set_var("GST_DEBUG", "3");
let result = if matches!(kind_clone, MediaKind::Screen) {
// Try Wayland Portal first
match select_wayland_source().await {
let wayland_attempt = match select_wayland_source().await {
Ok((node_id, keeper)) => {
tracing::info!("Selected Wayland source node: {}", node_id);
run_pipewire_capture(node_id, keeper, frame_tx_clone, broadcast_tx_clone, capture_running).await
// Try GStreamer PipeWire capture
match run_pipewire_capture(
node_id,
keeper,
frame_tx_clone.clone(),
broadcast_tx_clone.clone(),
capture_running.clone(),
)
.await
{
Ok(_) => Ok(()),
Err(e) => {
tracing::error!(
"GStreamer/PipeWire capture crashed: {}. Falling back to xcap.",
e
);
Err(e) // Signal to try fallback
}
}
}
Err(e) => {
tracing::warn!("Wayland source selection failed ({}). Falling back to xcap.", e);
run_xcap_capture(frame_tx_clone, broadcast_tx_clone, capture_running).await
tracing::warn!(
"Wayland source selection failed ({}). Falling back to xcap.",
e
);
Err(anyhow::anyhow!("Portal failed"))
}
};
// Fallback to xcap if Wayland failed (either portal or pipeline)
if wayland_attempt.is_err() {
run_xcap_capture(frame_tx_clone, broadcast_tx_clone, capture_running).await
} else {
Ok(())
}
} else {
run_ffmpeg_capture(kind_clone, frame_tx_clone, broadcast_tx_clone, capture_running).await
run_ffmpeg_capture(
kind_clone,
frame_tx_clone,
broadcast_tx_clone,
capture_running,
)
.await
};
if let Err(e) = result {
@@ -144,10 +169,7 @@ impl VideoCapture {
}));
}
Ok(Self {
running,
tasks,
})
Ok(Self { running, tasks })
}
/// Stop capture.
@@ -165,41 +187,29 @@ impl VideoCapture {
message: MediaStreamMessage,
mut recv: iroh::endpoint::RecvStream,
_broadcast_tx: tokio::sync::broadcast::Sender<WebMediaEvent>,
video_frame_tx: tokio::sync::broadcast::Sender<(Vec<u8>, u32, u32)>,
) -> Result<()> {
let kind = match message {
MediaStreamMessage::VideoStart { kind, .. } => kind,
_ => anyhow::bail!("Expected VideoStart"),
};
tracing::info!("Starting {:?} stream handler for {} (Native MPV)", kind, from);
tracing::info!(
"Starting {:?} stream handler for {} (Internal Player)",
kind,
from
);
// Spawn mpv
let mut cmd = Command::new("mpv");
cmd.args(&[
"--no-terminal",
"--ontop",
"--profile=low-latency",
"--cache=no",
"--force-window",
"-", // Read from stdin
]);
cmd.stdin(Stdio::piped());
// We might want to quell stdout/stderr or log them
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
// Initialize Internal Video Player (GStreamer)
let tx = video_frame_tx.clone();
let player = VideoPlayer::new(move |data, width, height| {
// Send frame to GUI
// We use a broadcast channel, so if GUI isn't listening (buffer full), we drop frames (good for live video)
let _ = tx.send((data, width, height));
})?;
// Ensure process is killed when this task drops
cmd.kill_on_drop(true);
player.start()?;
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to spawn mpv: {}", e);
return Err(anyhow::anyhow!("Failed to spawn mpv: {}", e));
}
};
let mut stdin = child.stdin.take().expect("Failed to open mpv stdin");
use tokio::io::AsyncWriteExt;
let mut packet_count = 0;
loop {
let msg: MediaStreamMessage = match decode_framed(&mut recv).await {
@@ -209,19 +219,20 @@ impl VideoCapture {
match msg {
MediaStreamMessage::VideoFrame { data, .. } => {
// Write directly to mpv stdin
// The data is already NAL units with start codes (from our capture logic)
// Note: 'data' from VideoFrame contains [1 byte type][NAL Unit including start code]
// We need to skip the first byte which is our protocol's frame type indicator (Key/Delta)
// Wait, let's check run_ffmpeg_capture.
// It does: payload.push(frame_type); payload.extend_from_slice(&nal_data);
// So yes, we need to skip the first byte.
if data.len() > 1 {
if let Err(e) = stdin.write_all(&data[1..]).await {
tracing::error!("Failed to write to mpv: {}", e);
// Skip 1 byte header (frame type)
let payload = &data[1..];
if let Err(e) = player.push_data(payload) {
tracing::error!("Failed to push data to player: {}", e);
break;
}
// Log activity occasionally
packet_count += 1;
if packet_count % 30 == 0 {
tracing::info!("Processed video packet #{}", packet_count);
}
}
}
MediaStreamMessage::VideoStop { .. } => {
@@ -232,7 +243,7 @@ impl VideoCapture {
}
}
let _ = child.kill().await;
player.stop()?;
Ok(())
}
}
@@ -260,11 +271,22 @@ async fn run_xcap_capture(
// Select first monitor for now
let monitor = &monitors[0];
let width = monitor.width().map_err(|e| anyhow::anyhow!("Failed to get monitor width: {}", e))?;
let height = monitor.height().map_err(|e| anyhow::anyhow!("Failed to get monitor height: {}", e))?;
let name = monitor.name().unwrap_or_else(|_| "Unknown Monitor".to_string());
let width = monitor
.width()
.map_err(|e| anyhow::anyhow!("Failed to get monitor width: {}", e))?;
let height = monitor
.height()
.map_err(|e| anyhow::anyhow!("Failed to get monitor height: {}", e))?;
let name = monitor
.name()
.unwrap_or_else(|_| "Unknown Monitor".to_string());
tracing::info!("Starting xcap capture on monitor: {} ({}x{})", name, width, height);
tracing::info!(
"Starting xcap capture on monitor: {} ({}x{})",
name,
width,
height
);
// 2. Spawn FFmpeg to encode raw frames
// We feed raw RGBA frames to stdin
@@ -272,23 +294,35 @@ async fn run_xcap_capture(
cmd.kill_on_drop(true);
cmd.args(&[
"-f", "rawvideo",
"-pixel_format", "rgba",
"-video_size", &format!("{}x{}", width, height),
"-framerate", "30",
"-i", "-", // Read raw frames from stdin
"-f",
"rawvideo",
"-pixel_format",
"rgba",
"-video_size",
&format!("{}x{}", width, height),
"-framerate",
"30",
"-i",
"-", // Read raw frames from stdin
]);
// Output args (same as before: HEVC NVENC/libx265 -> Raw stream)
cmd.args(&[
"-vf", "scale=1280:720", // Force 720p resize
"-c:v", "hevc_nvenc", // Try hardware first
"-vf",
"scale=1280:720", // Force 720p resize
"-c:v",
"hevc_nvenc", // Try hardware first
// Fallback or options...
"-b:v", "1M", // Lower bitrate to 1Mbps
"-g", "30", // Keyframe interval (GOP) 30
"-zerolatency", "1",
"-preset", "p4",
"-f", "hevc",
"-b:v",
"1M", // Lower bitrate to 1Mbps
"-g",
"30", // Keyframe interval (GOP) 30
"-zerolatency",
"1",
"-preset",
"p4",
"-f",
"hevc",
"-",
]);
@@ -357,16 +391,40 @@ async fn run_xcap_capture(
}
});
// 5. Read FFmpeg stdout and distribute (Same logic as run_ffmpeg_capture)
// 5. Read FFmpeg stdout and distribute
// stdout was already taken at line 297
// Define chosen_filter since we are using HEVC/H.265 in the hardcoded command above
// If you change the command to use H.264, change this to "h264_mp4toannexb"
let chosen_filter = "hevc_mp4toannexb";
process_ffmpeg_output(
stdout,
frame_tx,
broadcast_preview,
running,
chosen_filter.to_string(),
)
.await?;
let _ = child.kill().await;
stdin_task.abort();
Ok(())
}
async fn process_ffmpeg_output(
stdout: tokio::process::ChildStdout,
frame_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
broadcast_preview: tokio::sync::broadcast::Sender<WebMediaEvent>,
running: Arc<AtomicBool>,
chosen_filter: String,
) -> Result<()> {
let mut reader = BufReader::new(stdout);
let mut buffer = Vec::with_capacity(1024 * 1024);
let mut temp_buf = [0u8; 4096];
loop {
if !running.load(Ordering::Relaxed) {
break;
}
while running.load(Ordering::Relaxed) {
let n = match reader.read(&mut temp_buf).await {
Ok(0) => break, // EOF
Ok(n) => n,
@@ -395,13 +453,26 @@ async fn run_xcap_capture(
// data[start_code_len] is the NAL header.
let nal_header_byte = nal_data[start_code_len];
let is_key = if chosen_filter.contains("hevc") {
// HEVC (H.265)
// Type is bits 1-6 (0x7E) shifted right by 1.
let nal_type = (nal_header_byte & 0x7E) >> 1;
// HEVC Keyframes:
// 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT)
// 32-34: VPS, SPS, PPS (Parameters - treat as critical/key)
let is_key = (nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34);
(nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34)
} else {
// H.264 (AVC)
// Type is lower 5 bits (0x1F)
let nal_type = nal_header_byte & 0x1F;
// H.264 Keyframes:
// 5: IDR (Instantaneous Decoding Refresh) - Keyframe
// 7: SPS (Sequence Parameter Set)
// 8: PPS (Picture Parameter Set)
nal_type == 5 || nal_type == 7 || nal_type == 8
};
let frame_type = if is_key { 0u8 } else { 1u8 };
@@ -417,9 +488,6 @@ async fn run_xcap_capture(
});
}
}
let _ = child.kill().await;
stdin_task.abort();
Ok(())
}
@@ -467,7 +535,13 @@ async fn run_video_sender_native(
break;
}
}
Err(_) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("Video sender lagged by {} frames", n);
continue;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
let _ = write_framed(&mut send, &MediaStreamMessage::VideoStop { kind }).await;
@@ -497,10 +571,36 @@ async fn run_ffmpeg_capture(
}
let encoders = vec![
EncoderConfig {
name: "libx264 (Software Fallback)",
codec: "libx264",
opts: vec![
"-preset",
"ultrafast",
"-tune",
"zerolatency",
"-b:v",
"1M",
"-g",
"30",
],
format: "h264",
filter: "h264_mp4toannexb",
pixel_format: Some("yuv420p"), // libx264 often needs yuv420p
},
EncoderConfig {
name: "hevc_nvenc (Hardware)",
codec: "hevc_nvenc",
opts: vec!["-b:v", "1M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
opts: vec![
"-b:v",
"1M",
"-g",
"30",
"-zerolatency",
"1",
"-preset",
"p4",
],
format: "hevc",
filter: "hevc_mp4toannexb",
pixel_format: None, // NVENC usually handles formats well
@@ -508,19 +608,20 @@ async fn run_ffmpeg_capture(
EncoderConfig {
name: "h264_nvenc (Hardware Fallback)",
codec: "h264_nvenc",
opts: vec!["-b:v", "1.5M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
opts: vec![
"-b:v",
"1.5M",
"-g",
"30",
"-zerolatency",
"1",
"-preset",
"p4",
],
format: "h264",
filter: "h264_mp4toannexb",
pixel_format: None,
},
EncoderConfig {
name: "libx264 (Software Fallback)",
codec: "libx264",
opts: vec!["-preset", "ultrafast", "-tune", "zerolatency", "-b:v", "1M", "-g", "30"],
format: "h264",
filter: "h264_mp4toannexb",
pixel_format: Some("yuv420p"), // libx264 often needs yuv420p
},
];
let mut final_child = None;
@@ -538,10 +639,14 @@ async fn run_ffmpeg_capture(
match kind {
MediaKind::Camera => {
cmd.args(&[
"-f", "v4l2",
"-framerate", "30",
"-video_size", "1280x720",
"-i", "/dev/video0",
"-f",
"v4l2",
"-framerate",
"30",
"-video_size",
"1280x720",
"-i",
"/dev/video0",
]);
}
MediaKind::Screen => {
@@ -550,11 +655,16 @@ async fn run_ffmpeg_capture(
tracing::info!("Using x11grab on display: {}", display_env);
cmd.args(&[
"-f", "x11grab",
"-framerate", "30",
"-video_size", "1920x1080", // Input size (assuming 1080p for now, but safer to autodect or be large)
"-i", &display_env,
"-vf", "scale=1280:720", // Force 720p resize
"-f",
"x11grab",
"-framerate",
"30",
"-video_size",
"1920x1080", // Input size (assuming 1080p for now, but safer to autodect or be large)
"-i",
&display_env,
"-vf",
"scale=1280:720", // Force 720p resize
]);
}
_ => return Ok(()),
@@ -586,7 +696,11 @@ async fn run_ffmpeg_capture(
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
if let Ok(Some(status)) = child.try_wait() {
tracing::warn!("Encoder {} failed immediately with status: {}", enc.name, status);
tracing::warn!(
"Encoder {} failed immediately with status: {}",
enc.name,
status
);
// Read stderr to see why
if let Some(mut stderr) = child.stderr.take() {
let mut err_buf = String::new();
@@ -612,7 +726,10 @@ async fn run_ffmpeg_capture(
Ok(0) => break, // EOF
Ok(_) => {
// Log errors or critical warnings
if line.contains("Error") || line.contains("error") || line.contains("fail") {
if line.contains("Error")
|| line.contains("error")
|| line.contains("fail")
{
tracing::error!("FFmpeg: {}", line.trim());
}
line.clear();
@@ -643,115 +760,14 @@ async fn run_ffmpeg_capture(
};
let stdout = child.stdout.take().expect("Failed to open stdout");
// We don't need BufReader for raw check if we just read blocks, but fine to use it or just AsyncRead
let mut reader = BufReader::new(stdout);
// Raw H.264 parsing (Annex B)
// Stream is a sequence of NAL units, each starting with 00 00 00 01 (or 00 00 01)
// We need to buffer and split.
let mut buffer = Vec::with_capacity(1024 * 1024); // 1MB buffer
let mut temp_buf = [0u8; 4096];
while running.load(Ordering::Relaxed) {
let n = match reader.read(&mut temp_buf).await {
Ok(0) => break, // EOF
Ok(n) => n,
Err(_) => break,
};
buffer.extend_from_slice(&temp_buf[0..n]);
// Find NAL units in buffer
// A NAL unit starts with 00 00 00 01
// We look for start codes.
while let Some(start_idx) = find_start_code(&buffer) {
// If we found a start code at index 0, we can't extract a frame yet
// unless we have another start code later.
// But actually, the buffer MIGHT have multiple NALs.
// We need to find the NEXT start code to know where this one ends.
// Search from start_idx + 4
let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) {
next_start
} else {
// No next start code yet.
// If the buffer is getting huge, maybe we should just consume it?
// But for H.264 streaming we must be precise.
// Wait for more data.
break;
};
// Extract NAL unit (including start code? Browsers usually want it for AVC1/AnnexB)
// WebCodecs EncodedVideoChunk expects:
// "For 'avc1' (H.264), the chunk data must be an Annex B NAL unit."
// So we include the 00 00 00 01.
let nal_data = buffer.drain(start_idx..end_idx).collect::<Vec<u8>>();
// Send NAL
// Frame Type detection for H.264:
// NAL type is in the first byte AFTER the start code.
// Start Code is 00 00 00 01 (4 bytes) or 00 00 01 (3 bytes)
// find_start_code finds 00 00 00 01.
// Let's handle 3-byte start codes too? ffmpeg -f h264 usually sends 4-byte.
// Check if it's 3-byte or 4-byte start code
let start_code_len = if nal_data[2] == 1 { 3 } else { 4 };
// Construct payload
let mut payload: Vec<u8> = Vec::with_capacity(1 + nal_data.len());
// Check NAL type
// nal_data[start_code_len] is the NAL header (first byte).
let nal_header_byte = nal_data[start_code_len];
let is_key = if chosen_filter.contains("hevc") {
// HEVC (H.265)
// Type is bits 1-6 (0x7E) shifted right by 1.
let nal_type = (nal_header_byte & 0x7E) >> 1;
// HEVC Keyframes:
// 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT)
// 32-34: VPS, SPS, PPS (Parameters - treat as critical/key)
(nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34)
} else {
// H.264 (AVC)
// Type is lower 5 bits (0x1F)
let nal_type = nal_header_byte & 0x1F;
// H.264 Keyframes:
// 5: IDR (Instantaneous Decoding Refresh) - Keyframe
// 7: SPS (Sequence Parameter Set)
// 8: PPS (Picture Parameter Set)
nal_type == 5 || nal_type == 7 || nal_type == 8
};
let frame_type = if is_key { 0u8 } else { 1u8 };
payload.push(frame_type);
payload.extend_from_slice(&nal_data);
// Send to peers
let _ = frame_tx.send(payload.clone());
// Send to local web preview
let _ = broadcast_preview.send(WebMediaEvent::Video {
peer_id: "local".to_string(),
kind: kind.clone(),
data: payload,
});
// buffer now starts at what was end_idx (because of drain)
// drain removes items, so indexes shift.
// wait, drain(start..end) removes items. buffer automatically shrinks.
// so we loop again to see if there is ANOTHER start code at 0?
// Actually, `find_start_code` searches from 0.
// If we drained 0..end_idx, the next bytes are at 0.
}
}
process_ffmpeg_output(
stdout,
frame_tx,
broadcast_preview,
running,
chosen_filter.to_string(),
)
.await?;
let _ = child.kill().await;
Ok(())
@@ -762,10 +778,12 @@ fn find_start_code(data: &[u8]) -> Option<usize> {
}
fn find_start_code_from(data: &[u8], start: usize) -> Option<usize> {
if data.len() < 3 { return None; }
if data.len() < 3 {
return None;
}
for i in start..data.len() - 2 {
// Look for 00 00 01
if data[i] == 0 && data[i+1] == 0 && data[i+2] == 1 {
if data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1 {
// Check if it's actually 00 00 00 01 (4 bytes)
// If i > 0 and data[i-1] == 0, then the start code might have been at i-1
// But we iterate forward.
@@ -781,8 +799,8 @@ fn find_start_code_from(data: &[u8], start: usize) -> Option<usize> {
// If we find 00 00 01 at i, check if i > 0 and data[i-1] == 0.
// If so, the start code is at i-1 (4 bytes).
// Return i-1.
if i > start && data[i-1] == 0 {
return Some(i-1);
if i > start && data[i - 1] == 0 {
return Some(i - 1);
}
return Some(i);
}
@@ -794,31 +812,6 @@ fn find_start_code_from(data: &[u8], start: usize) -> Option<usize> {
// Wayland Capture Logic (ashpd)
// ---------------------------------------------------------------------------
async fn select_wayland_source() -> Result<(u32, AshpdKeeper)> {
let proxy = Screencast::new().await?;
let session = proxy.create_session().await?;
proxy
.select_sources(
&session,
CursorMode::Metadata,
SourceType::Monitor | SourceType::Window,
false,
None,
PersistMode::DoNot,
)
.await?;
let request = proxy.start(&session, &ashpd::WindowIdentifier::default()).await?;
let streams = request.response()?;
if let Some(stream) = streams.streams().iter().next() {
Ok((stream.pipe_wire_node_id(), AshpdKeeper { _proxy: proxy, _session: session }))
} else {
Err(anyhow::anyhow!("No pipewire stream returned from portal"))
}
}
async fn run_pipewire_capture(
node_id: u32,
_keeper: AshpdKeeper,
@@ -828,9 +821,26 @@ async fn run_pipewire_capture(
) -> Result<()> {
tracing::info!("Starting PipeWire capture for node: {}", node_id);
// Encoder Selection (Hardware preferred)
// Note: PipeWire input is raw, so we can encode it.
// Build pipeline: Robust ordering
// pipewiresrc -> videoconvert -> videoscale -> caps(720p) -> videorate -> caps(30fps) -> appsink
// We add caps *after* videoscale to enforce resolution, and *after* videorate to enforce fps.
let pipeline_str = format!(
"pipewiresrc path={} ! videoconvert ! videoscale ! video/x-raw,format=I420,width=1280,height=720 ! videorate ! video/x-raw,framerate=30/1 ! appsink name=sink sync=false drop=true max-buffers=1",
node_id
);
let capture = gstreamer::GStreamerCapture::new(&pipeline_str)?;
// Channel to receive frames from GStreamer
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
capture.set_callback(move |data| {
let _ = tx.send(data.to_vec());
});
capture.start()?;
// Encoder Selection (Hardware preferred)
struct EncoderConfig {
name: &'static str,
codec: &'static str,
@@ -841,16 +851,18 @@ async fn run_pipewire_capture(
let encoders = vec![
EncoderConfig {
name: "hevc_nvenc (Hardware)",
codec: "hevc_nvenc",
opts: vec!["-b:v", "1M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
format: "hevc",
filter: "hevc_mp4toannexb",
},
EncoderConfig {
name: "h264_nvenc (Hardware Fallback)",
codec: "h264_nvenc",
opts: vec!["-b:v", "1.5M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
name: "libx264 (Software Fallback)",
codec: "libx264",
opts: vec![
"-preset",
"ultrafast",
"-tune",
"zerolatency",
"-b:v",
"1M",
"-g",
"30",
],
format: "h264",
filter: "h264_mp4toannexb",
},
@@ -858,17 +870,47 @@ async fn run_pipewire_capture(
name: "hevc_vaapi (VAAPI HEVC)",
codec: "hevc_vaapi",
opts: vec![
"-vaapi_device", "/dev/dri/renderD128",
"-vf", "format=nv12,hwupload,scale_vaapi=w=1280:h=720",
"-b:v", "1M", "-g", "30"
"-vaapi_device",
"/dev/dri/renderD128",
"-vf",
"format=nv12,hwupload,scale_vaapi=w=1280:h=720",
"-b:v",
"1M",
"-g",
"30",
],
format: "hevc",
filter: "hevc_mp4toannexb",
},
EncoderConfig {
name: "libx264 (Software Fallback)",
codec: "libx264",
opts: vec!["-preset", "ultrafast", "-tune", "zerolatency", "-b:v", "1M", "-g", "30"],
name: "hevc_nvenc (Hardware)",
codec: "hevc_nvenc",
opts: vec![
"-b:v",
"1M",
"-g",
"30",
"-zerolatency",
"1",
"-preset",
"p4",
],
format: "hevc",
filter: "hevc_mp4toannexb",
},
EncoderConfig {
name: "h264_nvenc (Hardware Fallback)",
codec: "h264_nvenc",
opts: vec![
"-b:v",
"1.5M",
"-g",
"30",
"-zerolatency",
"1",
"-preset",
"p4",
],
format: "h264",
filter: "h264_mp4toannexb",
},
@@ -884,13 +926,13 @@ async fn run_pipewire_capture(
cmd.kill_on_drop(true);
cmd.args(&[
"-f", "pipewire",
"-framerate", "30",
"-i", &node_id.to_string(),
"-f", "rawvideo", "-pix_fmt", "yuv420p", "-s", "1280x720", "-r", "30", "-i", "-",
]);
if !enc.name.contains("vaapi") {
cmd.args(&["-vf", "scale=1280:720"]);
// Already scaled in GStreamer, but let's be safe or just pass through?
// If we rely on GStreamer scaling, we don't need scale filter here.
// But if VAAPI needs it...
}
cmd.arg("-c:v").arg(enc.codec);
@@ -899,6 +941,7 @@ async fn run_pipewire_capture(
cmd.arg("-f").arg(enc.format);
cmd.arg("-");
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
@@ -921,53 +964,51 @@ async fn run_pipewire_capture(
None => return Err(anyhow::anyhow!("All encoders failed for PipeWire")),
};
let mut stdin = child.stdin.take().expect("Failed to open ffmpeg stdin");
let stdout = child.stdout.take().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let mut buffer = Vec::with_capacity(1024 * 1024);
let mut temp_buf = [0u8; 4096];
while running.load(Ordering::Relaxed) {
let n = match reader.read(&mut temp_buf).await {
// Capture stderr for debugging
if let Some(stderr) = child.stderr.take() {
let running_log = running.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
while running_log.load(Ordering::Relaxed) {
match reader.read_line(&mut line).await {
Ok(0) => break,
Ok(n) => n,
Ok(_) => {
if line.contains("Error") || line.contains("error") || line.contains("fail")
{
tracing::error!("FFmpeg PipeWire Error: {}", line.trim());
}
line.clear();
}
Err(_) => break,
};
buffer.extend_from_slice(&temp_buf[0..n]);
while let Some(start_idx) = find_start_code(&buffer) {
let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) {
next_start
} else {
break;
};
let nal_data = buffer.drain(start_idx..end_idx).collect::<Vec<u8>>();
let start_code_len = if nal_data[2] == 1 { 3 } else { 4 };
let nal_header_byte = nal_data[start_code_len];
let is_key = if chosen_filter.contains("hevc") {
let nal_type = (nal_header_byte & 0x7E) >> 1;
(nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34)
} else {
let nal_type = nal_header_byte & 0x1F;
nal_type == 5 || nal_type == 7 || nal_type == 8
};
let frame_type = if is_key { 0u8 } else { 1u8 };
let mut payload = Vec::with_capacity(1 + nal_data.len());
payload.push(frame_type);
payload.extend_from_slice(&nal_data);
let _ = frame_tx.send(payload.clone());
let _ = broadcast_preview.send(WebMediaEvent::Video {
peer_id: "local".to_string(),
kind: MediaKind::Screen,
data: payload,
}
}
});
}
// Spawn async task to write frames to FFmpeg stdin
let stdin_task = tokio::spawn(async move {
while let Some(frame_data) = rx.recv().await {
if stdin.write_all(&frame_data).await.is_err() {
break;
}
}
});
process_ffmpeg_output(
stdout,
frame_tx,
broadcast_preview,
running,
chosen_filter.to_string(),
)
.await?;
let _ = child.kill().await;
stdin_task.abort();
let _ = capture.stop();
Ok(())
}

View File

@@ -0,0 +1,43 @@
use anyhow::Result;
use ashpd::desktop::{
screencast::{CursorMode, Screencast, SourceType},
PersistMode,
};
pub struct AshpdKeeper {
pub _proxy: Screencast<'static>,
pub _session: ashpd::desktop::Session<'static, Screencast<'static>>,
}
pub async fn select_wayland_source() -> Result<(u32, AshpdKeeper)> {
let proxy = Screencast::new().await?;
let session = proxy.create_session().await?;
proxy
.select_sources(
&session,
CursorMode::Embedded,
SourceType::Monitor | SourceType::Window,
false,
None,
PersistMode::DoNot,
)
.await?;
let request = proxy
.start(&session, &ashpd::WindowIdentifier::default())
.await?;
let streams = request.response()?;
if let Some(stream) = streams.streams().iter().next() {
Ok((
stream.pipe_wire_node_id(),
AshpdKeeper {
_proxy: proxy,
_session: session,
},
))
} else {
Err(anyhow::anyhow!("No pipewire stream returned from portal"))
}
}

View File

@@ -7,6 +7,7 @@
//! Each feature is runtime-toggleable and runs on dedicated threads/tasks.
pub mod capture;
pub mod playback;
pub mod voice;
use iroh::EndpointId;
@@ -36,6 +37,11 @@ pub enum WebMediaEvent {
},
}
pub enum InternalMediaEvent {
VideoStart(EndpointId),
VideoStop(EndpointId),
}
/// Tracks all active media sessions.
pub struct MediaState {
/// Active voice chat session (if any).
@@ -50,18 +56,32 @@ pub struct MediaState {
// Input channels (from Web -> MediaState -> Peers)
pub mic_broadcast: tokio::sync::broadcast::Sender<Vec<f32>>,
pub screen_broadcast: tokio::sync::broadcast::Sender<Vec<u8>>,
// Channel for integrated video player frames (RGBA, width, height)
pub video_frame_tx: tokio::sync::broadcast::Sender<(Vec<u8>, u32, u32)>,
pub mic_bitrate: Arc<AtomicU32>,
pub input_device: Option<String>,
pub output_device: Option<String>,
pub initial_master_volume: f32,
pub initial_mic_volume: f32,
pub initial_noise_suppression: bool,
pub active_video_sessions: Arc<dashmap::DashSet<EndpointId>>,
pub media_event_tx: Option<tokio::sync::mpsc::Sender<InternalMediaEvent>>,
}
impl MediaState {
pub fn new(mic_bitrate: u32, input_device: Option<String>, output_device: Option<String>, master_volume: f32, noise_suppression: bool) -> Self {
pub fn new(
mic_bitrate: u32,
input_device: Option<String>,
output_device: Option<String>,
master_volume: f32,
mic_volume: f32,
noise_suppression: bool,
media_event_tx: Option<tokio::sync::mpsc::Sender<InternalMediaEvent>>,
) -> Self {
let (broadcast_tx, _) = tokio::sync::broadcast::channel(100);
let (mic_broadcast, _) = tokio::sync::broadcast::channel(100);
let (screen_broadcast, _) = tokio::sync::broadcast::channel(100);
let (video_frame_tx, _) = tokio::sync::broadcast::channel(10); // Low buffer for live video
Self {
voice: None,
screen: None,
@@ -69,11 +89,15 @@ impl MediaState {
broadcast_tx,
mic_broadcast,
screen_broadcast,
video_frame_tx,
mic_bitrate: Arc::new(AtomicU32::new(mic_bitrate)),
input_device,
output_device,
initial_master_volume: master_volume,
initial_mic_volume: mic_volume,
initial_noise_suppression: noise_suppression,
active_video_sessions: Arc::new(dashmap::DashSet::new()),
media_event_tx,
}
}
@@ -109,7 +133,8 @@ impl MediaState {
for device in devices {
if let Ok(name) = device.name() {
// Filter out common noise/unusable devices
if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") {
if name.contains("dmix") || name.contains("dsnoop") || name.contains("null")
{
continue;
}
@@ -157,7 +182,8 @@ impl MediaState {
if let Ok(devices) = host.output_devices() {
for device in devices {
if let Ok(name) = device.name() {
if name.contains("dmix") || name.contains("dsnoop") || name.contains("null") {
if name.contains("dmix") || name.contains("dsnoop") || name.contains("null")
{
continue;
}
@@ -191,6 +217,12 @@ impl MediaState {
}
}
pub fn set_mic_volume(&self, volume: f32) {
if let Some(voice) = &self.voice {
voice.set_input_volume(volume);
}
}
pub fn get_volume(&self) -> f32 {
if let Some(voice) = &self.voice {
voice.get_volume()
@@ -199,6 +231,12 @@ impl MediaState {
}
}
pub fn get_mic_volume(&self) -> f32 {
// VoiceChat doesn't strictly expose get_input_volume yet but we know it
// Let's assume we track it or just return initial if not active
self.initial_mic_volume // TODO: Fetch from VoiceChat if active? VoiceChat stores it in Atomic.
}
pub fn is_denoise_enabled(&self) -> bool {
if let Some(voice) = &self.voice {
voice.is_denoise_enabled()
@@ -258,6 +296,7 @@ impl MediaState {
self.input_device.clone(),
self.output_device.clone(), // Added output device
self.initial_master_volume,
self.initial_mic_volume, // Pass initial mic volume
self.initial_noise_suppression,
) {
Ok(vc) => {
@@ -317,6 +356,9 @@ impl MediaState {
mut recv: iroh::endpoint::RecvStream,
) {
let broadcast_tx = self.broadcast_tx.clone();
let video_frame_tx = self.video_frame_tx.clone();
let active_sessions = self.active_video_sessions.clone();
let media_event_tx = self.media_event_tx.clone();
// Spawn a task to determine stream type and handle it
let handle = tokio::spawn(async move {
@@ -325,16 +367,36 @@ impl MediaState {
Ok(msg) => match msg {
MediaStreamMessage::AudioStart { .. } => {
// DEPRECATED in Native Datagram mode
tracing::warn!("Received Audio stream from {} (unexpected in datagram mode)", from);
// We could support stream fallback, but for now we ignore or log.
// Or we can close it.
tracing::warn!(
"Received Audio stream from {} (unexpected in datagram mode)",
from
);
}
MediaStreamMessage::VideoStart { .. } => {
tracing::info!("Accepted Video stream from {:?}", from);
if let Err(e) =
VideoCapture::handle_incoming_video_native(from, msg, recv, broadcast_tx)
.await
{
active_sessions.insert(from);
// Notify start
if let Some(tx) = &media_event_tx {
let _ = tx.send(InternalMediaEvent::VideoStart(from)).await;
}
let result = VideoCapture::handle_incoming_video_native(
from,
msg,
recv,
broadcast_tx,
video_frame_tx,
)
.await;
active_sessions.remove(&from);
// Notify stop
if let Some(tx) = &media_event_tx {
let _ = tx.send(InternalMediaEvent::VideoStop(from)).await;
}
if let Err(e) = result {
tracing::error!("Video native playback error: {}", e);
}
}
@@ -362,15 +424,18 @@ impl MediaState {
/// Handle an incoming datagram (unreliable audio/video).
pub fn handle_incoming_datagram(&mut self, from: EndpointId, data: bytes::Bytes) {
if data.is_empty() { return; }
if data.is_empty() {
return;
}
// Check first byte for type
match data[0] {
1 => { // Audio
1 => {
// Audio
if let Some(voice) = &mut self.voice {
voice.handle_datagram(from, data);
}
},
}
// 2 => Video?
_ => {
// tracing::trace!("Unknown datagram type: {}", data[0]);
@@ -410,9 +475,3 @@ impl MediaState {
}
}
}
impl Drop for MediaState {
fn drop(&mut self) {
self.shutdown();
}
}

98
src/media/playback.rs Normal file
View File

@@ -0,0 +1,98 @@
use anyhow::Result;
use gstreamer::prelude::*;
use gstreamer::{ElementFactory, Pipeline, State};
use gstreamer_app::{AppSink, AppSrc};
use std::sync::{Arc, Mutex};
pub struct VideoPlayer {
pipeline: Pipeline,
appsrc: AppSrc,
appsink: AppSink,
}
impl VideoPlayer {
pub fn new<F>(on_frame: F) -> Result<Self>
where
F: Fn(Vec<u8>, u32, u32) + Send + Sync + 'static,
{
gstreamer::init()?;
// Pipeline: appsrc -> decodebin -> videoconvert -> appsink
// We use decodebin to handle both H.264 and HEVC automatically
// output format RGBA for Iced
let pipeline_str = "appsrc name=src is-live=true format=time do-timestamp=true ! \
parsebin ! \
decodebin ! \
videoconvert ! \
video/x-raw,format=RGBA ! \
appsink name=sink drop=true max-buffers=1 sync=false";
let pipeline = gstreamer::parse::launch(pipeline_str)?
.downcast::<Pipeline>()
.map_err(|_| anyhow::anyhow!("Expected pipeline"))?;
let appsrc = pipeline
.by_name("src")
.ok_or_else(|| anyhow::anyhow!("Missing src"))?
.downcast::<AppSrc>()
.map_err(|_| anyhow::anyhow!("src is not appsrc"))?;
let appsink = pipeline
.by_name("sink")
.ok_or_else(|| anyhow::anyhow!("Missing sink"))?
.downcast::<AppSink>()
.map_err(|_| anyhow::anyhow!("sink is not appsink"))?;
// Set up callback
let callbacks = gstreamer_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gstreamer::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gstreamer::FlowError::Error)?;
// Get caps to know width/height
let caps = sample.caps().ok_or(gstreamer::FlowError::Error)?;
let structure = caps.structure(0).ok_or(gstreamer::FlowError::Error)?;
let width = structure
.get::<i32>("width")
.map_err(|_| gstreamer::FlowError::Error)? as u32;
let height = structure
.get::<i32>("height")
.map_err(|_| gstreamer::FlowError::Error)? as u32;
let map = buffer
.map_readable()
.map_err(|_| gstreamer::FlowError::Error)?;
on_frame(map.to_vec(), width, height);
Ok(gstreamer::FlowSuccess::Ok)
})
.build();
appsink.set_callbacks(callbacks);
Ok(Self {
pipeline,
appsrc,
appsink,
})
}
pub fn start(&self) -> Result<()> {
self.pipeline.set_state(State::Playing)?;
Ok(())
}
pub fn stop(&self) -> Result<()> {
let _ = self.pipeline.set_state(State::Null);
Ok(())
}
pub fn push_data(&self, data: &[u8]) -> Result<()> {
let buffer = gstreamer::Buffer::from_slice(data.to_vec());
self.appsrc.push_buffer(buffer)?;
Ok(())
}
}
// Remove Drop impl to prevent panic on shutdown. Rely on explicit stop() or process exit.

View File

@@ -6,11 +6,10 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow};
use dashmap::DashMap;
use nnnoiseless::DenoiseState;
use crate::media::WebMediaEvent;
use anyhow::{anyhow, Result};
use audiopus::{
coder::Decoder as OpusDecoder, coder::Encoder as OpusEncoder, Application, Bitrate, Channels,
SampleRate,
@@ -18,9 +17,10 @@ use audiopus::{
use bytes::Bytes;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use crossbeam_channel::{unbounded, Receiver, Sender};
use dashmap::DashMap;
use iroh::EndpointId;
use nnnoiseless::DenoiseState;
use ringbuf::{traits::*, HeapRb};
use crate::media::WebMediaEvent;
const PACKET_TYPE_AUDIO: u8 = 1;
const FRAME_SIZE_SAMPLES: usize = 960; // 20ms at 48kHz
@@ -51,8 +51,6 @@ impl std::ops::DerefMut for SyncAudioProducer {
type AudioProducer = SyncAudioProducer;
type AudioConsumer = ringbuf::HeapCons<f32>;
/// Main voice chat coordination.
pub struct VoiceChat {
running: Arc<AtomicBool>,
@@ -71,9 +69,16 @@ pub struct VoiceChat {
// Audio processing controls
pub denoise_enabled: Arc<AtomicBool>,
pub output_volume: Arc<AtomicU32>, // stored as f32 bits
pub input_volume: Arc<AtomicU32>, // stored as f32 bits
pub peer_levels: Arc<DashMap<EndpointId, f32>>,
}
// impl Drop for VoiceChat {
// fn drop(&mut self) {
// self.stop();
// }
// }
impl VoiceChat {
/// Start voice chat session (Native Version with CPAL + QUIC Datagrams).
pub fn start_native(
@@ -86,11 +91,13 @@ impl VoiceChat {
input_device_name: Option<String>,
output_device_name: Option<String>,
initial_volume: f32,
initial_mic_volume: f32,
initial_denoise: bool,
) -> Result<Self> {
let running = Arc::new(AtomicBool::new(true));
let denoise_enabled = Arc::new(AtomicBool::new(initial_denoise));
let output_volume = Arc::new(AtomicU32::new(initial_volume.to_bits()));
let input_volume = Arc::new(AtomicU32::new(initial_mic_volume.to_bits()));
let peer_levels = Arc::new(DashMap::new());
tracing::info!("Starting Native Voice Chat...");
@@ -103,7 +110,13 @@ impl VoiceChat {
let playback_levels = peer_levels.clone();
let playback_thread = thread::spawn(move || {
run_playback_loop(playback_running, new_peer_rx, playback_device_name, playback_volume, playback_levels);
run_playback_loop(
playback_running,
new_peer_rx,
playback_device_name,
playback_volume,
playback_levels,
);
});
// 2. Setup Capture Thread (CPAL Input)
@@ -111,9 +124,16 @@ impl VoiceChat {
let mic_tx_capture = mic_tx.clone();
let capture_device_name = input_device_name.clone();
let capture_denoise = denoise_enabled.clone();
let capture_volume = input_volume.clone();
let capture_thread = thread::spawn(move || {
run_capture_loop(capture_running, mic_tx_capture, capture_device_name, capture_denoise);
run_capture_loop(
capture_running,
mic_tx_capture,
capture_device_name,
capture_denoise,
capture_volume,
);
});
// 3. Setup Network Sender Task (Opus -> Datagrams)
@@ -130,7 +150,8 @@ impl VoiceChat {
mic_rx_sender,
sender_running,
mic_bitrate_clone,
).await;
)
.await;
});
tasks.push(sender_task);
@@ -143,12 +164,18 @@ impl VoiceChat {
new_peer_tx,
denoise_enabled,
output_volume,
input_volume,
peer_levels,
})
}
pub fn set_volume(&self, volume: f32) {
self.output_volume.store(volume.to_bits(), Ordering::Relaxed);
self.output_volume
.store(volume.to_bits(), Ordering::Relaxed);
}
pub fn set_input_volume(&self, volume: f32) {
self.input_volume.store(volume.to_bits(), Ordering::Relaxed);
}
pub fn get_volume(&self) -> f32 {
@@ -166,7 +193,10 @@ impl VoiceChat {
}
pub fn get_peer_levels(&self) -> HashMap<EndpointId, f32> {
self.peer_levels.iter().map(|entry| (*entry.key(), *entry.value())).collect()
self.peer_levels
.iter()
.map(|entry| (*entry.key(), *entry.value()))
.collect()
}
// Kept for compatibility but unused in Native mode
@@ -188,6 +218,7 @@ impl VoiceChat {
}
self.tasks.clear();
// Spawn a thread to join audio threads to avoid blocking async context if they are stuck
if let Some(t) = self.capture_thread.take() {
t.thread().unpark();
}
@@ -254,18 +285,26 @@ fn run_capture_loop(
mic_tx: tokio::sync::broadcast::Sender<Vec<f32>>,
device_name: Option<String>,
denoise_enabled: Arc<AtomicBool>,
input_volume: Arc<AtomicU32>,
) {
let host = cpal::default_host();
// Find device
let device = if let Some(ref name) = device_name {
host.input_devices().ok().and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false)))
tracing::info!("Requesting input device: '{}'", name);
host.input_devices()
.ok()
.and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false)))
} else {
tracing::info!("Requesting default input device");
host.default_input_device()
};
let device = match device {
Some(d) => d,
Some(d) => {
tracing::info!("Found input device: {:?}", d.name());
d
}
None => {
tracing::error!("No input device found");
return;
@@ -293,6 +332,9 @@ fn run_capture_loop(
let mut processing_buffer: Vec<f32> = Vec::with_capacity(480 * 2);
let mut out_buf = [0.0f32; DenoiseState::FRAME_SIZE];
let mut last_log = Instant::now();
let mut packet_count = 0;
let err_fn = |err| tracing::error!("Input stream error: {}", err);
let stream = match config.sample_format() {
@@ -303,14 +345,29 @@ fn run_capture_loop(
move |data: &[f32], _: &_| {
if !running_clone.load(Ordering::Relaxed) { return; }
packet_count += 1;
if last_log.elapsed() >= Duration::from_secs(1) {
tracing::info!("Microphone input active: {} callbacks/sec, {} samples in last callback", packet_count, data.len());
packet_count = 0;
last_log = Instant::now();
}
// Convert to Mono
let channels = stream_config.channels as usize;
let mono_samples: Vec<f32> = if channels == 1 {
let mut mono_samples: Vec<f32> = if channels == 1 {
data.to_vec()
} else {
data.chunks(channels).map(|chunk| chunk.iter().sum::<f32>() / channels as f32).collect()
};
// Apply Input Gain
let gain = f32::from_bits(input_volume.load(Ordering::Relaxed));
if gain != 1.0 {
for sample in &mut mono_samples {
*sample *= gain;
}
}
if !mono_samples.is_empty() {
let use_denoise = denoise_enabled.load(Ordering::Relaxed);
@@ -331,7 +388,7 @@ fn run_capture_loop(
err_fn,
None
)
},
}
_ => {
tracing::error!("Input device does not support F32 samples");
return;
@@ -363,13 +420,20 @@ fn run_playback_loop(
let host = cpal::default_host();
let device = if let Some(ref name) = device_name {
host.output_devices().ok().and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false)))
tracing::info!("Requesting output device: '{}'", name);
host.output_devices()
.ok()
.and_then(|mut ds| ds.find(|d| d.name().map(|n| n == *name).unwrap_or(false)))
} else {
tracing::info!("Requesting default output device");
host.default_output_device()
};
let device = match device {
Some(d) => d,
Some(d) => {
tracing::info!("Found output device: {:?}", d.name());
d
}
None => {
tracing::error!("No output device found");
return;
@@ -391,13 +455,25 @@ fn run_playback_loop(
let err_fn = |err| tracing::error!("Output stream error: {}", err);
let mut last_log = Instant::now();
let mut packet_count = 0;
let stream = match config.sample_format() {
cpal::SampleFormat::F32 => {
let running_clone = running.clone();
device.build_output_stream(
&stream_config,
move |data: &mut [f32], _: &_| {
if !running_clone.load(Ordering::Relaxed) { return; }
if !running_clone.load(Ordering::Relaxed) {
return;
}
packet_count += 1;
if last_log.elapsed() >= Duration::from_secs(1) {
tracing::info!("Speaker output active: {} callbacks/sec", packet_count);
packet_count = 0;
last_log = Instant::now();
}
let master_vol = f32::from_bits(output_volume.load(Ordering::Relaxed));
@@ -455,9 +531,9 @@ fn run_playback_loop(
}
},
err_fn,
None
None,
)
},
}
_ => {
tracing::error!("Output device does not support F32 samples");
return;
@@ -484,7 +560,9 @@ async fn run_network_sender(
running: Arc<AtomicBool>,
mic_bitrate: Arc<AtomicU32>,
) {
if peers.is_empty() { return; }
if peers.is_empty() {
return;
}
// Initialize connections
let mut connections = Vec::new();
@@ -504,7 +582,9 @@ async fn run_network_sender(
.expect("Failed to create Opus encoder");
// Initial bitrate
let _ = encoder.set_bitrate(Bitrate::BitsPerSecond(mic_bitrate.load(Ordering::Relaxed) as i32));
let _ = encoder.set_bitrate(Bitrate::BitsPerSecond(
mic_bitrate.load(Ordering::Relaxed) as i32
));
let mut pcm_buffer: Vec<f32> = Vec::with_capacity(FRAME_SIZE_SAMPLES * 2);
let mut opus_buffer = vec![0u8; 1500];

View File

@@ -73,6 +73,8 @@ pub struct PeerInfo {
pub is_self: bool,
#[serde(skip)]
pub audio_level: f32,
#[serde(skip)]
pub is_streaming_video: bool,
}
/// Manages the iroh networking stack.
@@ -207,6 +209,7 @@ impl NetworkManager {
capabilities: None,
is_self: false,
audio_level: 0.0,
is_streaming_video: false,
});
}
let _ = event_tx.send(NetEvent::PeerUp(peer_id)).await;

View File

@@ -64,9 +64,11 @@ pub async fn start_web_server(
axum::serve(listener, app).await.unwrap();
}
// --- AUDIO ---
async fn ws_audio_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> impl IntoResponse {
async fn ws_audio_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_audio_socket(socket, state))
}
@@ -77,7 +79,11 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) {
// Outgoing (Server -> Browser)
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if let WebMediaEvent::Audio { peer_id, data: samples } = event {
if let WebMediaEvent::Audio {
peer_id,
data: samples,
} = event
{
// Protocol: [IDLen] [ID] [f32...]
let id_bytes = peer_id.as_bytes();
let id_len = id_bytes.len() as u8;
@@ -87,7 +93,11 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) {
for s in samples {
payload.extend_from_slice(&s.to_ne_bytes());
}
if sender.send(Message::Binary(Bytes::from(payload))).await.is_err() {
if sender
.send(Message::Binary(Bytes::from(payload)))
.await
.is_err()
{
break;
}
}
@@ -111,7 +121,10 @@ async fn handle_audio_socket(socket: WebSocket, state: AppState) {
}
// --- SCREEN ---
async fn ws_screen_handler(ws: WebSocketUpgrade, State(state): State<AppState>) -> impl IntoResponse {
async fn ws_screen_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_screen_socket(socket, state))
}
@@ -122,7 +135,12 @@ async fn handle_screen_socket(socket: WebSocket, state: AppState) {
// Outgoing (Server -> Browser)
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if let WebMediaEvent::Video { peer_id, kind, data } = event {
if let WebMediaEvent::Video {
peer_id,
kind,
data,
} = event
{
if matches!(kind, MediaKind::Screen) {
let id_bytes = peer_id.as_bytes();
let id_len = id_bytes.len() as u8;
@@ -131,7 +149,11 @@ async fn handle_screen_socket(socket: WebSocket, state: AppState) {
payload.extend_from_slice(id_bytes);
payload.extend_from_slice(&data);
if sender.send(Message::Binary(Bytes::from(payload))).await.is_err() {
if sender
.send(Message::Binary(Bytes::from(payload)))
.await
.is_err()
{
break;
}
}

View File

@@ -290,8 +290,9 @@ screen_resolution = "1920x1080"
// ============================================================================
mod tui_tests {
use crossterm::event::{KeyCode, KeyEvent, KeyModifiers};
use p2p_chat::app_logic::AppCommand;
use p2p_chat::config::{Theme, UiConfig};
use p2p_chat::tui::{App, InputMode, TuiCommand};
use p2p_chat::tui::{App, InputMode};
fn make_app() -> App {
let theme: Theme = UiConfig::default().into();
@@ -320,7 +321,7 @@ mod tui_tests {
let mut app = make_app();
assert_eq!(app.input_mode, InputMode::Editing);
let cmd = app.handle_key(key(KeyCode::Esc));
assert!(matches!(cmd, TuiCommand::None));
assert!(matches!(cmd, AppCommand::None));
assert_eq!(app.input_mode, InputMode::Normal);
}
@@ -329,7 +330,7 @@ mod tui_tests {
let mut app = make_app();
app.input_mode = InputMode::Normal;
let cmd = app.handle_key(key(KeyCode::Char('i')));
assert!(matches!(cmd, TuiCommand::None));
assert!(matches!(cmd, AppCommand::None));
assert_eq!(app.input_mode, InputMode::Editing);
}
@@ -338,7 +339,7 @@ mod tui_tests {
let mut app = make_app();
app.input_mode = InputMode::Normal;
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::None));
assert!(matches!(cmd, AppCommand::None));
assert_eq!(app.input_mode, InputMode::Editing);
}
@@ -347,7 +348,7 @@ mod tui_tests {
let mut app = make_app();
app.input_mode = InputMode::Normal;
let cmd = app.handle_key(key(KeyCode::Char('/')));
assert!(matches!(cmd, TuiCommand::None));
assert!(matches!(cmd, AppCommand::None));
assert_eq!(app.input_mode, InputMode::Editing);
assert_eq!(app.input, "/");
assert_eq!(app.cursor_position, 1);
@@ -358,7 +359,7 @@ mod tui_tests {
let mut app = make_app();
app.input_mode = InputMode::Normal;
let cmd = app.handle_key(key(KeyCode::Char('q')));
assert!(matches!(cmd, TuiCommand::Quit));
assert!(matches!(cmd, AppCommand::Quit));
}
#[test]
@@ -388,7 +389,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char('h')));
app.handle_key(key(KeyCode::Char('i')));
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::SendMessage(ref s) if s == "hi"));
assert!(matches!(cmd, AppCommand::SendMessage(ref s) if s == "hi"));
assert_eq!(app.input, "");
assert_eq!(app.cursor_position, 0);
}
@@ -397,7 +398,7 @@ mod tui_tests {
fn enter_on_empty_input_does_nothing() {
let mut app = make_app();
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::None));
assert!(matches!(cmd, AppCommand::None));
}
#[test]
@@ -407,7 +408,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::Quit));
assert!(matches!(cmd, AppCommand::Quit));
}
#[test]
@@ -417,7 +418,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::SystemMessage(_)));
assert!(matches!(cmd, AppCommand::SystemMessage(_)));
}
#[test]
@@ -427,7 +428,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::SystemMessage(_)));
assert!(matches!(cmd, AppCommand::SystemMessage(_)));
}
#[test]
@@ -437,7 +438,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::ChangeNick(ref s) if s == "alice"));
assert!(matches!(cmd, AppCommand::ChangeNick(ref s) if s == "alice"));
}
#[test]
@@ -447,7 +448,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::SystemMessage(_)));
assert!(matches!(cmd, AppCommand::SystemMessage(_)));
}
#[test]
@@ -457,7 +458,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::Connect(ref s) if s == "abc123"));
assert!(matches!(cmd, AppCommand::Connect(ref s) if s == "abc123"));
}
#[test]
@@ -467,17 +468,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::ToggleVoice));
}
#[test]
fn camera_command() {
let mut app = make_app();
for c in "/camera".chars() {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::ToggleCamera));
assert!(matches!(cmd, AppCommand::ToggleVoice));
}
#[test]
@@ -487,7 +478,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::ToggleScreen));
assert!(matches!(cmd, AppCommand::ToggleScreen));
}
#[test]
@@ -497,7 +488,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::Leave));
assert!(matches!(cmd, AppCommand::Leave));
}
#[test]
@@ -507,7 +498,7 @@ mod tui_tests {
app.handle_key(key(KeyCode::Char(c)));
}
let cmd = app.handle_key(key(KeyCode::Enter));
assert!(matches!(cmd, TuiCommand::SystemMessage(_)));
assert!(matches!(cmd, AppCommand::SystemMessage(_)));
}
#[test]
@@ -518,7 +509,7 @@ mod tui_tests {
}
let cmd = app.handle_key(key(KeyCode::Enter));
match cmd {
TuiCommand::SendFile(path) => {
AppCommand::SendFile(path) => {
assert_eq!(path.to_str().unwrap(), "/tmp/test.txt");
}
_ => panic!("Expected SendFile, got {:?}", cmd),