Files
p2p-chat/src/media/capture.rs

974 lines
35 KiB
Rust

//! Video capture and playback using FFmpeg and MPV.
//!
//! Captures video by spawning an `ffmpeg` process and reading its stdout.
//! Plays video by spawning `mpv` (or `vlc`) and writing to its stdin.
use anyhow::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing;
use crate::media::WebMediaEvent;
use crate::protocol::{decode_framed, write_framed, MediaKind, MediaStreamMessage};
use std::process::Stdio;
use tokio::io::{AsyncReadExt, BufReader, AsyncWriteExt, AsyncBufReadExt};
use tokio::process::Command;
use xcap::Monitor;
use ashpd::desktop::{PersistMode, screencast::{CursorMode, Screencast, SourceType}};
struct AshpdKeeper {
_proxy: Screencast<'static>,
_session: ashpd::desktop::Session<'static, Screencast<'static>>,
}
/// Manages a video capture session (camera or screen).
pub struct VideoCapture {
running: Arc<AtomicBool>,
tasks: Vec<tokio::task::JoinHandle<()>>,
}
impl VideoCapture {
/// Start web-based screen share (relay from Web to Peers).
pub async fn start_web_screen(
peers: Vec<iroh::EndpointId>,
network_manager: crate::net::NetworkManager,
source_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
) -> Result<Self> {
let running = Arc::new(AtomicBool::new(true));
let mut tasks = Vec::new();
// For each peer, spawn a sender task that subscribes to the source
for peer in peers {
let running = running.clone();
let net = network_manager.clone();
let rx = source_tx.subscribe();
let kind = MediaKind::Screen;
tasks.push(tokio::spawn(async move {
if let Err(e) = run_video_sender_native(net, peer, kind, rx, running).await {
tracing::error!("Video sender web screen error: {}", e);
}
}));
}
Ok(Self {
running,
tasks,
})
}
/// Start web-based camera share (relay from Web to Peers).
pub async fn start_web_camera(
peers: Vec<iroh::EndpointId>,
network_manager: crate::net::NetworkManager,
source_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
) -> Result<Self> {
let running = Arc::new(AtomicBool::new(true));
let mut tasks = Vec::new();
// For each peer, spawn a sender task that subscribes to the source
for peer in peers {
let running = running.clone();
let net = network_manager.clone();
let rx = source_tx.subscribe();
let kind = MediaKind::Camera;
tasks.push(tokio::spawn(async move {
if let Err(e) = run_video_sender_native(net, peer, kind, rx, running).await {
tracing::error!("Video sender web camera error: {}", e);
}
}));
}
Ok(Self {
running,
tasks,
})
}
/// Start native video capture via FFmpeg.
pub async fn start_native(
kind: MediaKind,
_local_peer_id: iroh::EndpointId,
peers: Vec<iroh::EndpointId>,
network_manager: crate::net::NetworkManager,
broadcast_tx: tokio::sync::broadcast::Sender<WebMediaEvent>,
) -> Result<Self> {
let running = Arc::new(AtomicBool::new(true));
// Channel to distribute frames from FFmpeg to peer senders
let (frame_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(100);
let mut tasks = Vec::new();
// 1. Spawn Capture task
let capture_running = running.clone();
let frame_tx_clone = frame_tx.clone();
let broadcast_tx_clone = broadcast_tx.clone();
let kind_clone = kind.clone();
tasks.push(tokio::spawn(async move {
let result = if matches!(kind_clone, MediaKind::Screen) {
// Try Wayland Portal first
match select_wayland_source().await {
Ok((node_id, keeper)) => {
tracing::info!("Selected Wayland source node: {}", node_id);
run_pipewire_capture(node_id, keeper, frame_tx_clone, broadcast_tx_clone, capture_running).await
}
Err(e) => {
tracing::warn!("Wayland source selection failed ({}). Falling back to xcap.", e);
run_xcap_capture(frame_tx_clone, broadcast_tx_clone, capture_running).await
}
}
} else {
run_ffmpeg_capture(kind_clone, frame_tx_clone, broadcast_tx_clone, capture_running).await
};
if let Err(e) = result {
tracing::error!("Capture error: {}", e);
}
}));
// 2. Spawn peer sender tasks (reuse run_video_sender_web logic but with internal channel)
for peer in peers {
let running = running.clone();
let net = network_manager.clone();
let rx = frame_tx.subscribe();
let kind = kind.clone();
tasks.push(tokio::spawn(async move {
if let Err(e) = run_video_sender_native(net, peer, kind, rx, running).await {
tracing::error!("Video sender native error: {}", e);
}
}));
}
Ok(Self {
running,
tasks,
})
}
/// Stop capture.
pub fn stop(&mut self) {
self.running.store(false, Ordering::Relaxed);
for task in self.tasks.drain(..) {
task.abort();
}
}
/// Handle incoming video stream from a peer (Native MPV Version).
/// Receives video frames (e.g. H.264/HEVC encoded inside protocol messages) and pipes them to MPV.
pub async fn handle_incoming_video_native(
from: iroh::EndpointId,
message: MediaStreamMessage,
mut recv: iroh::endpoint::RecvStream,
_broadcast_tx: tokio::sync::broadcast::Sender<WebMediaEvent>,
) -> Result<()> {
let kind = match message {
MediaStreamMessage::VideoStart { kind, .. } => kind,
_ => anyhow::bail!("Expected VideoStart"),
};
tracing::info!("Starting {:?} stream handler for {} (Native MPV)", kind, from);
// Spawn mpv
let mut cmd = Command::new("mpv");
cmd.args(&[
"--no-terminal",
"--ontop",
"--profile=low-latency",
"--cache=no",
"--force-window",
"-", // Read from stdin
]);
cmd.stdin(Stdio::piped());
// We might want to quell stdout/stderr or log them
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
// Ensure process is killed when this task drops
cmd.kill_on_drop(true);
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to spawn mpv: {}", e);
return Err(anyhow::anyhow!("Failed to spawn mpv: {}", e));
}
};
let mut stdin = child.stdin.take().expect("Failed to open mpv stdin");
use tokio::io::AsyncWriteExt;
loop {
let msg: MediaStreamMessage = match decode_framed(&mut recv).await {
Ok(m) => m,
Err(_) => break, // EOF
};
match msg {
MediaStreamMessage::VideoFrame { data, .. } => {
// Write directly to mpv stdin
// The data is already NAL units with start codes (from our capture logic)
// Note: 'data' from VideoFrame contains [1 byte type][NAL Unit including start code]
// We need to skip the first byte which is our protocol's frame type indicator (Key/Delta)
// Wait, let's check run_ffmpeg_capture.
// It does: payload.push(frame_type); payload.extend_from_slice(&nal_data);
// So yes, we need to skip the first byte.
if data.len() > 1 {
if let Err(e) = stdin.write_all(&data[1..]).await {
tracing::error!("Failed to write to mpv: {}", e);
break;
}
}
}
MediaStreamMessage::VideoStop { .. } => {
tracing::info!("Peer stopped video");
break;
}
_ => {}
}
}
let _ = child.kill().await;
Ok(())
}
}
impl Drop for VideoCapture {
fn drop(&mut self) {
self.stop();
}
}
// ---------------------------------------------------------------------------
// XCAP Capture Logic
// ---------------------------------------------------------------------------
async fn run_xcap_capture(
frame_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
broadcast_preview: tokio::sync::broadcast::Sender<WebMediaEvent>,
running: Arc<AtomicBool>,
) -> Result<()> {
// 1. Get monitors
let monitors = Monitor::all().map_err(|e| anyhow::anyhow!("Failed to list monitors: {}", e))?;
if monitors.is_empty() {
return Err(anyhow::anyhow!("No monitors found"));
}
// Select first monitor for now
let monitor = &monitors[0];
let width = monitor.width().map_err(|e| anyhow::anyhow!("Failed to get monitor width: {}", e))?;
let height = monitor.height().map_err(|e| anyhow::anyhow!("Failed to get monitor height: {}", e))?;
let name = monitor.name().unwrap_or_else(|_| "Unknown Monitor".to_string());
tracing::info!("Starting xcap capture on monitor: {} ({}x{})", name, width, height);
// 2. Spawn FFmpeg to encode raw frames
// We feed raw RGBA frames to stdin
let mut cmd = Command::new("ffmpeg");
cmd.kill_on_drop(true);
cmd.args(&[
"-f", "rawvideo",
"-pixel_format", "rgba",
"-video_size", &format!("{}x{}", width, height),
"-framerate", "30",
"-i", "-", // Read raw frames from stdin
]);
// Output args (same as before: HEVC NVENC/libx265 -> Raw stream)
cmd.args(&[
"-vf", "scale=1280:720", // Force 720p resize
"-c:v", "hevc_nvenc", // Try hardware first
// Fallback or options...
"-b:v", "1M", // Lower bitrate to 1Mbps
"-g", "30", // Keyframe interval (GOP) 30
"-zerolatency", "1",
"-preset", "p4",
"-f", "hevc",
"-",
]);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
// Log stderr
let stderr_file = std::fs::File::create("ffmpeg_xcap.log").unwrap();
cmd.stderr(Stdio::from(stderr_file));
let mut child = cmd.spawn()?;
let mut stdin = child.stdin.take().expect("Failed to open ffmpeg stdin");
let stdout = child.stdout.take().expect("Failed to open ffmpeg stdout");
// 3. Spawn thread/task to capture frames and write to FFmpeg stdin
// xcap is synchronous/blocking, so we should run it in a blocking task or separate thread
// But we need to write to async stdin.
let running_clone = running.clone();
let monitor_clone = monitor.clone(); // Monitor might not be cloneable easily? It is.
// We use a channel to send frames from blocking capture to async writer
let (img_tx, mut img_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(2);
// Spawn blocking capture thread
std::thread::spawn(move || {
// Target 30fps
let frame_duration = std::time::Duration::from_millis(33);
while running_clone.load(Ordering::Relaxed) {
let start = std::time::Instant::now();
match monitor_clone.capture_image() {
Ok(image) => {
// image is RgbaImage (Vec<u8>)
// We need raw bytes
let bytes = image.into_raw();
if img_tx.blocking_send(bytes).is_err() {
break;
}
}
Err(e) => {
eprintln!("xcap capture failed: {}", e);
// Don't break immediately, maybe transient?
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
// Sleep to maintain framerate
let elapsed = start.elapsed();
if elapsed < frame_duration {
std::thread::sleep(frame_duration - elapsed);
} else {
// Even if we are slow, sleep a tiny bit to yield CPU
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
});
// 4. Async task to write frames to FFmpeg stdin
let stdin_task = tokio::spawn(async move {
while let Some(frame_data) = img_rx.recv().await {
if stdin.write_all(&frame_data).await.is_err() {
break;
}
}
});
// 5. Read FFmpeg stdout and distribute (Same logic as run_ffmpeg_capture)
let mut reader = BufReader::new(stdout);
let mut buffer = Vec::with_capacity(1024 * 1024);
let mut temp_buf = [0u8; 4096];
loop {
if !running.load(Ordering::Relaxed) {
break;
}
let n = match reader.read(&mut temp_buf).await {
Ok(0) => break, // EOF
Ok(n) => n,
Err(_) => break,
};
buffer.extend_from_slice(&temp_buf[0..n]);
// Find NAL units
while let Some(start_idx) = find_start_code(&buffer) {
let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) {
next_start
} else {
break; // Wait for more data
};
let nal_data = buffer.drain(start_idx..end_idx).collect::<Vec<u8>>();
// Check if it's 3-byte or 4-byte start code
let start_code_len = if nal_data[2] == 1 { 3 } else { 4 };
// Construct payload
let mut payload: Vec<u8> = Vec::with_capacity(1 + nal_data.len());
// Check NAL type (HEVC/H.265)
// NAL header is after start code.
// data[start_code_len] is the NAL header.
let nal_header_byte = nal_data[start_code_len];
// Type is bits 1-6 (0x7E) shifted right by 1.
let nal_type = (nal_header_byte & 0x7E) >> 1;
// HEVC Keyframes:
// 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT)
// 32-34: VPS, SPS, PPS (Parameters - treat as critical/key)
let is_key = (nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34);
let frame_type = if is_key { 0u8 } else { 1u8 };
payload.push(frame_type);
payload.extend_from_slice(&nal_data);
let _ = frame_tx.send(payload.clone());
let _ = broadcast_preview.send(WebMediaEvent::Video {
peer_id: "local".to_string(),
kind: MediaKind::Screen,
data: payload,
});
}
}
let _ = child.kill().await;
stdin_task.abort();
Ok(())
}
// ---------------------------------------------------------------------------
// FFmpeg Capture Logic
// ---------------------------------------------------------------------------
async fn run_video_sender_native(
network_manager: crate::net::NetworkManager,
peer: iroh::EndpointId,
kind: MediaKind,
mut input_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
running: Arc<AtomicBool>,
) -> Result<()> {
let (mut send, _) = network_manager
.open_media_stream(peer, kind.clone())
.await?;
// Send Start message
write_framed(
&mut send,
&MediaStreamMessage::VideoStart {
kind,
width: 1280, // Target 720p
height: 720,
fps: 30,
},
)
.await?;
while running.load(Ordering::Relaxed) {
match input_rx.recv().await {
Ok(data) => {
// FFmpeg data is already [FrameType][VP8 Chunk], see run_ffmpeg_capture
// Just wrap in protocol message
let msg = MediaStreamMessage::VideoFrame {
sequence: 0,
timestamp_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
data,
};
if write_framed(&mut send, &msg).await.is_err() {
break;
}
}
Err(_) => break,
}
}
let _ = write_framed(&mut send, &MediaStreamMessage::VideoStop { kind }).await;
send.finish()?;
Ok(())
}
async fn run_ffmpeg_capture(
kind: MediaKind,
frame_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
broadcast_preview: tokio::sync::broadcast::Sender<WebMediaEvent>,
running: Arc<AtomicBool>,
) -> Result<()> {
let mut cmd = Command::new("ffmpeg");
cmd.kill_on_drop(true);
// Output args: Robust Encoder Selection
// Try: hevc_nvenc -> h264_nvenc -> libx264
struct EncoderConfig {
name: &'static str,
codec: &'static str,
opts: Vec<&'static str>,
format: &'static str, // "hevc" or "h264" (raw stream format)
filter: &'static str, // "hevc_mp4toannexb" or "h264_mp4toannexb"
pixel_format: Option<&'static str>, // Force pixel format if needed
}
let encoders = vec![
EncoderConfig {
name: "hevc_nvenc (Hardware)",
codec: "hevc_nvenc",
opts: vec!["-b:v", "1M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
format: "hevc",
filter: "hevc_mp4toannexb",
pixel_format: None, // NVENC usually handles formats well
},
EncoderConfig {
name: "h264_nvenc (Hardware Fallback)",
codec: "h264_nvenc",
opts: vec!["-b:v", "1.5M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
format: "h264",
filter: "h264_mp4toannexb",
pixel_format: None,
},
EncoderConfig {
name: "libx264 (Software Fallback)",
codec: "libx264",
opts: vec!["-preset", "ultrafast", "-tune", "zerolatency", "-b:v", "1M", "-g", "30"],
format: "h264",
filter: "h264_mp4toannexb",
pixel_format: Some("yuv420p"), // libx264 often needs yuv420p
},
];
let mut final_child = None;
let mut chosen_filter = "";
// We need to keep the stdout/stderr open
for enc in &encoders {
tracing::info!("Trying encoder: {}", enc.name);
let mut cmd = Command::new("ffmpeg");
cmd.kill_on_drop(true);
// Input args (re-applied for each attempt)
// TODO: Detect platform/device better. For now assuming Linux/V4L2.
match kind {
MediaKind::Camera => {
cmd.args(&[
"-f", "v4l2",
"-framerate", "30",
"-video_size", "1280x720",
"-i", "/dev/video0",
]);
}
MediaKind::Screen => {
// Always use x11grab (works on X11 and XWayland)
let display_env = std::env::var("DISPLAY").unwrap_or_else(|_| ":0.0".to_string());
tracing::info!("Using x11grab on display: {}", display_env);
cmd.args(&[
"-f", "x11grab",
"-framerate", "30",
"-video_size", "1920x1080", // Input size (assuming 1080p for now, but safer to autodect or be large)
"-i", &display_env,
"-vf", "scale=1280:720", // Force 720p resize
]);
}
_ => return Ok(()),
}
// Pixel format if needed
if let Some(pix_fmt) = enc.pixel_format {
cmd.args(&["-pix_fmt", pix_fmt]);
}
// Encoder args
cmd.arg("-c:v").arg(enc.codec);
cmd.args(&enc.opts);
// Bitstream filter to ensure Annex B (start codes)
cmd.arg("-bsf:v").arg(enc.filter);
// Output format
cmd.arg("-f").arg(enc.format);
cmd.arg("-");
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
match cmd.spawn() {
Ok(mut child) => {
// Wait a bit to see if it crashes immediately
// We sleep for 500ms to let ffmpeg initialize
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
if let Ok(Some(status)) = child.try_wait() {
tracing::warn!("Encoder {} failed immediately with status: {}", enc.name, status);
// Read stderr to see why
if let Some(mut stderr) = child.stderr.take() {
let mut err_buf = String::new();
let _ = stderr.read_to_string(&mut err_buf).await;
tracing::warn!("FFmpeg stderr: {}", err_buf);
}
continue; // Try next
}
// It seems to be running
tracing::info!("Selected encoder: {}", enc.name);
tracing::info!("Capture loop started");
// Redirect stderr to log file or tracing
if let Some(stderr) = child.stderr.take() {
// Spawn a task to log stderr line by line
let running_log = running.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
while running_log.load(Ordering::Relaxed) {
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
// Log errors or critical warnings
if line.contains("Error") || line.contains("error") || line.contains("fail") {
tracing::error!("FFmpeg: {}", line.trim());
}
line.clear();
}
Err(_) => break,
}
}
});
}
final_child = Some(child);
chosen_filter = enc.filter;
break;
}
Err(e) => {
tracing::warn!("Failed to spawn encoder {}: {}", enc.name, e);
continue;
}
}
}
let mut child = match final_child {
Some(c) => c,
None => {
tracing::error!("All encoders failed. Cannot start capture.");
return Err(anyhow::anyhow!("All encoders failed"));
}
};
let stdout = child.stdout.take().expect("Failed to open stdout");
// We don't need BufReader for raw check if we just read blocks, but fine to use it or just AsyncRead
let mut reader = BufReader::new(stdout);
// Raw H.264 parsing (Annex B)
// Stream is a sequence of NAL units, each starting with 00 00 00 01 (or 00 00 01)
// We need to buffer and split.
let mut buffer = Vec::with_capacity(1024 * 1024); // 1MB buffer
let mut temp_buf = [0u8; 4096];
while running.load(Ordering::Relaxed) {
let n = match reader.read(&mut temp_buf).await {
Ok(0) => break, // EOF
Ok(n) => n,
Err(_) => break,
};
buffer.extend_from_slice(&temp_buf[0..n]);
// Find NAL units in buffer
// A NAL unit starts with 00 00 00 01
// We look for start codes.
while let Some(start_idx) = find_start_code(&buffer) {
// If we found a start code at index 0, we can't extract a frame yet
// unless we have another start code later.
// But actually, the buffer MIGHT have multiple NALs.
// We need to find the NEXT start code to know where this one ends.
// Search from start_idx + 4
let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) {
next_start
} else {
// No next start code yet.
// If the buffer is getting huge, maybe we should just consume it?
// But for H.264 streaming we must be precise.
// Wait for more data.
break;
};
// Extract NAL unit (including start code? Browsers usually want it for AVC1/AnnexB)
// WebCodecs EncodedVideoChunk expects:
// "For 'avc1' (H.264), the chunk data must be an Annex B NAL unit."
// So we include the 00 00 00 01.
let nal_data = buffer.drain(start_idx..end_idx).collect::<Vec<u8>>();
// Send NAL
// Frame Type detection for H.264:
// NAL type is in the first byte AFTER the start code.
// Start Code is 00 00 00 01 (4 bytes) or 00 00 01 (3 bytes)
// find_start_code finds 00 00 00 01.
// Let's handle 3-byte start codes too? ffmpeg -f h264 usually sends 4-byte.
// Check if it's 3-byte or 4-byte start code
let start_code_len = if nal_data[2] == 1 { 3 } else { 4 };
// Construct payload
let mut payload: Vec<u8> = Vec::with_capacity(1 + nal_data.len());
// Check NAL type
// nal_data[start_code_len] is the NAL header (first byte).
let nal_header_byte = nal_data[start_code_len];
let is_key = if chosen_filter.contains("hevc") {
// HEVC (H.265)
// Type is bits 1-6 (0x7E) shifted right by 1.
let nal_type = (nal_header_byte & 0x7E) >> 1;
// HEVC Keyframes:
// 16-21: IRAP (BLA_W_LP, BLA_W_RADL, BLA_N_LP, IDR_W_RADL, IDR_N_LP, CRA_NUT)
// 32-34: VPS, SPS, PPS (Parameters - treat as critical/key)
(nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34)
} else {
// H.264 (AVC)
// Type is lower 5 bits (0x1F)
let nal_type = nal_header_byte & 0x1F;
// H.264 Keyframes:
// 5: IDR (Instantaneous Decoding Refresh) - Keyframe
// 7: SPS (Sequence Parameter Set)
// 8: PPS (Picture Parameter Set)
nal_type == 5 || nal_type == 7 || nal_type == 8
};
let frame_type = if is_key { 0u8 } else { 1u8 };
payload.push(frame_type);
payload.extend_from_slice(&nal_data);
// Send to peers
let _ = frame_tx.send(payload.clone());
// Send to local web preview
let _ = broadcast_preview.send(WebMediaEvent::Video {
peer_id: "local".to_string(),
kind: kind.clone(),
data: payload,
});
// buffer now starts at what was end_idx (because of drain)
// drain removes items, so indexes shift.
// wait, drain(start..end) removes items. buffer automatically shrinks.
// so we loop again to see if there is ANOTHER start code at 0?
// Actually, `find_start_code` searches from 0.
// If we drained 0..end_idx, the next bytes are at 0.
}
}
let _ = child.kill().await;
Ok(())
}
fn find_start_code(data: &[u8]) -> Option<usize> {
find_start_code_from(data, 0)
}
fn find_start_code_from(data: &[u8], start: usize) -> Option<usize> {
if data.len() < 3 { return None; }
for i in start..data.len() - 2 {
// Look for 00 00 01
if data[i] == 0 && data[i+1] == 0 && data[i+2] == 1 {
// Check if it's actually 00 00 00 01 (4 bytes)
// If i > 0 and data[i-1] == 0, then the start code might have been at i-1
// But we iterate forward.
// If we find 00 00 01 at i, we return i.
// If there was a 0 before it, it would have been found as 00 00 01 at i-1?
// Wait. 00 00 00 01 contains 00 00 01 starting at offset 1.
// So if we have 00 00 00 01:
// i=0: 00 00 00 -> No.
// i=1: 00 00 01 -> Yes. Return 1.
// But the start code is at 0!
// Correct logic:
// If we find 00 00 01 at i, check if i > 0 and data[i-1] == 0.
// If so, the start code is at i-1 (4 bytes).
// Return i-1.
if i > start && data[i-1] == 0 {
return Some(i-1);
}
return Some(i);
}
}
None
}
// ---------------------------------------------------------------------------
// Wayland Capture Logic (ashpd)
// ---------------------------------------------------------------------------
async fn select_wayland_source() -> Result<(u32, AshpdKeeper)> {
let proxy = Screencast::new().await?;
let session = proxy.create_session().await?;
proxy
.select_sources(
&session,
CursorMode::Metadata,
SourceType::Monitor | SourceType::Window,
false,
None,
PersistMode::DoNot,
)
.await?;
let request = proxy.start(&session, &ashpd::WindowIdentifier::default()).await?;
let streams = request.response()?;
if let Some(stream) = streams.streams().iter().next() {
Ok((stream.pipe_wire_node_id(), AshpdKeeper { _proxy: proxy, _session: session }))
} else {
Err(anyhow::anyhow!("No pipewire stream returned from portal"))
}
}
async fn run_pipewire_capture(
node_id: u32,
_keeper: AshpdKeeper,
frame_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
broadcast_preview: tokio::sync::broadcast::Sender<WebMediaEvent>,
running: Arc<AtomicBool>,
) -> Result<()> {
tracing::info!("Starting PipeWire capture for node: {}", node_id);
// Encoder Selection (Hardware preferred)
// Note: PipeWire input is raw, so we can encode it.
struct EncoderConfig {
name: &'static str,
codec: &'static str,
opts: Vec<&'static str>,
format: &'static str,
filter: &'static str,
}
let encoders = vec![
EncoderConfig {
name: "hevc_nvenc (Hardware)",
codec: "hevc_nvenc",
opts: vec!["-b:v", "1M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
format: "hevc",
filter: "hevc_mp4toannexb",
},
EncoderConfig {
name: "h264_nvenc (Hardware Fallback)",
codec: "h264_nvenc",
opts: vec!["-b:v", "1.5M", "-g", "30", "-zerolatency", "1", "-preset", "p4"],
format: "h264",
filter: "h264_mp4toannexb",
},
EncoderConfig {
name: "hevc_vaapi (VAAPI HEVC)",
codec: "hevc_vaapi",
opts: vec![
"-vaapi_device", "/dev/dri/renderD128",
"-vf", "format=nv12,hwupload,scale_vaapi=w=1280:h=720",
"-b:v", "1M", "-g", "30"
],
format: "hevc",
filter: "hevc_mp4toannexb",
},
EncoderConfig {
name: "libx264 (Software Fallback)",
codec: "libx264",
opts: vec!["-preset", "ultrafast", "-tune", "zerolatency", "-b:v", "1M", "-g", "30"],
format: "h264",
filter: "h264_mp4toannexb",
},
];
let mut final_child = None;
let mut chosen_filter = "";
for enc in &encoders {
tracing::info!("Trying encoder: {}", enc.name);
let mut cmd = Command::new("ffmpeg");
cmd.kill_on_drop(true);
cmd.args(&[
"-f", "pipewire",
"-framerate", "30",
"-i", &node_id.to_string(),
]);
if !enc.name.contains("vaapi") {
cmd.args(&["-vf", "scale=1280:720"]);
}
cmd.arg("-c:v").arg(enc.codec);
cmd.args(&enc.opts);
cmd.arg("-bsf:v").arg(enc.filter);
cmd.arg("-f").arg(enc.format);
cmd.arg("-");
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
match cmd.spawn() {
Ok(mut child) => {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
if let Ok(Some(_)) = child.try_wait() {
continue;
}
final_child = Some(child);
chosen_filter = enc.filter;
break;
}
Err(_) => continue,
}
}
let mut child = match final_child {
Some(c) => c,
None => return Err(anyhow::anyhow!("All encoders failed for PipeWire")),
};
let stdout = child.stdout.take().expect("Failed to open stdout");
let mut reader = BufReader::new(stdout);
let mut buffer = Vec::with_capacity(1024 * 1024);
let mut temp_buf = [0u8; 4096];
while running.load(Ordering::Relaxed) {
let n = match reader.read(&mut temp_buf).await {
Ok(0) => break,
Ok(n) => n,
Err(_) => break,
};
buffer.extend_from_slice(&temp_buf[0..n]);
while let Some(start_idx) = find_start_code(&buffer) {
let end_idx = if let Some(next_start) = find_start_code_from(&buffer, start_idx + 4) {
next_start
} else {
break;
};
let nal_data = buffer.drain(start_idx..end_idx).collect::<Vec<u8>>();
let start_code_len = if nal_data[2] == 1 { 3 } else { 4 };
let nal_header_byte = nal_data[start_code_len];
let is_key = if chosen_filter.contains("hevc") {
let nal_type = (nal_header_byte & 0x7E) >> 1;
(nal_type >= 16 && nal_type <= 21) || (nal_type >= 32 && nal_type <= 34)
} else {
let nal_type = nal_header_byte & 0x1F;
nal_type == 5 || nal_type == 7 || nal_type == 8
};
let frame_type = if is_key { 0u8 } else { 1u8 };
let mut payload = Vec::with_capacity(1 + nal_data.len());
payload.push(frame_type);
payload.extend_from_slice(&nal_data);
let _ = frame_tx.send(payload.clone());
let _ = broadcast_preview.send(WebMediaEvent::Video {
peer_id: "local".to_string(),
kind: MediaKind::Screen,
data: payload,
});
}
}
let _ = child.kill().await;
Ok(())
}