Files
p2p-chat/src/media/voice.rs
2026-02-12 21:06:34 +03:00

303 lines
11 KiB
Rust

//! Voice capture and playback using PipeWire + Audiopus (via Songbird dependency).
//!
//! Architecture:
//! - Capture runs on a dedicated OS thread (PipeWire main loop).
//! - PipeWire process callback copies PCM → crossbeam channel.
//! - Async task reads from channel, encodes with Opus, sends over QUIC.
//! - Playback: receives Opus packets from QUIC, decodes, feeds to PipeWire output.
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::thread;
use crate::media::WebMediaEvent;
use crate::protocol::{decode_framed, MediaStreamMessage};
use anyhow::Result;
use postcard;
// Use audiopus types directly
use audiopus::{
coder::Decoder as OpusDecoder, coder::Encoder as OpusEncoder, Application, Bitrate, Channels,
SampleRate,
};
// Constants
const SAMPLE_RATE_VAL: i32 = 48000;
const FRAME_SIZE_MS: u32 = 20; // 20ms
const FRAME_SIZE_SAMPLES: usize = (SAMPLE_RATE_VAL as usize * FRAME_SIZE_MS as usize) / 1000;
/// Main voice chat coordination.
pub struct VoiceChat {
running: Arc<AtomicBool>,
capture_thread: Option<thread::JoinHandle<()>>,
tasks: Vec<tokio::task::JoinHandle<()>>,
datagram_decoders: std::collections::HashMap<iroh::EndpointId, OpusDecoder>,
}
impl VoiceChat {
/// Start voice chat session (Web Version).
/// Uses browser for capture/playback handling implicitly via `MediaState` channels,
/// but here we handle the NETWORK side encoding/decoding.
pub fn start_web(
net: crate::net::NetworkManager,
peers: Vec<iroh::EndpointId>, // Multiple peers
mic_rx: tokio::sync::broadcast::Receiver<Vec<f32>>,
_broadcast_tx: tokio::sync::broadcast::Sender<WebMediaEvent>,
mic_bitrate: Arc<AtomicU32>,
) -> Result<Self> {
let running = Arc::new(AtomicBool::new(true));
let mut tasks = Vec::new();
// Spawn a single task to encode once and send to all peers.
let sender_running = running.clone();
let net_clone = net.clone();
let mic_bitrate_clone = mic_bitrate.clone();
let sender_task = tokio::spawn(async move {
if let Err(e) = run_opis_sender_web_multi(
net_clone,
peers,
mic_rx,
sender_running,
mic_bitrate_clone,
)
.await
{
tracing::error!("Voice sender failed: {}", e);
}
});
tasks.push(sender_task);
Ok(Self {
running,
capture_thread: None,
tasks,
datagram_decoders: std::collections::HashMap::new(),
})
}
/// Stop voice chat.
pub fn stop(&mut self) {
self.running.store(false, Ordering::SeqCst);
for task in &self.tasks {
task.abort();
}
self.tasks.clear();
if let Some(t) = self.capture_thread.take() {
t.thread().unpark(); // Wake up if sleeping
let _ = t.join();
}
}
/// Handle incoming audio stream (Web Version).
pub async fn handle_incoming_audio_web(
from: iroh::EndpointId,
message: MediaStreamMessage,
mut recv: iroh::endpoint::RecvStream,
broadcast_tx: tokio::sync::broadcast::Sender<WebMediaEvent>,
) -> Result<()> {
// Initialize Opus decoder
let mut decoder = OpusDecoder::new(SampleRate::Hz48000, Channels::Mono)
.map_err(|e| anyhow::anyhow!("Failed to create Opus decoder: {:?}", e))?;
// Process start message
match message {
MediaStreamMessage::AudioStart { .. } => {
tracing::info!("Incoming voice stream started (web) from {}", from);
}
_ => anyhow::bail!("Expected AudioStart"),
}
let mut decode_buf = vec![0f32; FRAME_SIZE_SAMPLES];
loop {
let msg: MediaStreamMessage = match decode_framed(&mut recv).await {
Ok(m) => m,
Err(_) => break, // EOF
};
match msg {
MediaStreamMessage::AudioData { opus_data, .. } => {
// Removed `channels` field usage if it existed
match decoder.decode_float(Some(&opus_data), &mut decode_buf, false) {
Ok(len) => {
let samples = decode_buf[..len].to_vec();
// Broadcast to web
let short_id: String = format!("{}", from).chars().take(8).collect();
let _ = broadcast_tx.send(WebMediaEvent::Audio {
peer_id: short_id,
data: samples,
});
}
Err(e) => {
tracing::warn!("Opus decode error: {:?}", e);
}
}
}
MediaStreamMessage::AudioStop => {
tracing::info!("Peer stopped audio");
break;
}
_ => {}
}
}
Ok(())
}
pub fn handle_datagram(
&mut self,
from: iroh::EndpointId,
data: bytes::Bytes,
broadcast_tx: tokio::sync::broadcast::Sender<WebMediaEvent>,
) {
// tracing::info!("Received datagram from {} ({} bytes)", from, data.len());
match postcard::from_bytes::<MediaStreamMessage>(&data) {
Ok(MediaStreamMessage::AudioData {
opus_data,
sequence,
}) => {
if sequence % 50 == 0 {
tracing::info!("Received AudioData seq {} from {}", sequence, from);
}
let decoder = self.datagram_decoders.entry(from).or_insert_with(|| {
tracing::info!("Creating new OpusDecoder for {}", from);
OpusDecoder::new(SampleRate::Hz48000, Channels::Mono)
.expect("Failed to create decoder")
});
// Max frame size is ~120ms (5760 samples). Use safe buffer.
let mut pcm = vec![0.0f32; 5760];
match decoder.decode_float(Some(&opus_data), &mut pcm, false) {
Ok(len) => {
let samples = pcm[..len].to_vec();
let short_id: String = format!("{}", from).chars().take(8).collect();
let _ = broadcast_tx.send(WebMediaEvent::Audio {
peer_id: short_id,
data: samples,
});
}
Err(e) => tracing::warn!("Opus decode error: {:?}", e),
}
}
Ok(_) => {} // Ignore non-audio datagrams
Err(e) => tracing::warn!("Failed to deserialize datagram: {}", e),
}
}
}
// ---------------------------------------------------------------------------
// Opus sender — encodes PCM and sends over QUIC (Multi-Peer)
// ---------------------------------------------------------------------------
async fn run_opis_sender_web_multi(
network_manager: crate::net::NetworkManager,
peers: Vec<iroh::EndpointId>,
mut input_rx: tokio::sync::broadcast::Receiver<Vec<f32>>,
running: Arc<AtomicBool>,
mic_bitrate: Arc<AtomicU32>,
) -> Result<()> {
if peers.is_empty() {
return Ok(());
}
// Connect to all peers to get Connection handles
let mut connections = Vec::new();
for peer in peers {
// We use VOICE_ALPN, but for datagrams ALPN matters for connection establishment.
match network_manager
.endpoint
.connect(peer, crate::net::VOICE_ALPN)
.await
{
Ok(conn) => {
connections.push(conn);
}
Err(e) => {
tracing::warn!("Failed to connect to {}: {}", peer, e);
}
}
}
if connections.is_empty() {
tracing::warn!("No reachable peers for voice chat");
return Ok(());
}
let mut encoder = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
.map_err(|e| anyhow::anyhow!("Failed to create Opus encoder: {:?}", e))?;
// Set initial bitrate
let current_bitrate = mic_bitrate.load(Ordering::Relaxed);
encoder
.set_bitrate(Bitrate::BitsPerSecond(current_bitrate as i32))
.map_err(|e| anyhow::anyhow!("Failed to set bitrate: {:?}", e))?;
// Opus frame size: 20ms at 48kHz = 960 samples
let frame_size = FRAME_SIZE_SAMPLES;
let mut pcm_buffer: Vec<f32> = Vec::with_capacity(frame_size * 2);
let mut opus_buffer = vec![0u8; 1500]; // MTU-ish
let mut sequence: u64 = 0;
tracing::info!("Starting voice sender loop for {} peers", connections.len());
while running.load(Ordering::Relaxed) {
// ... bitrate check ...
// Receive PCM from Web
match input_rx.recv().await {
Ok(samples) => {
// tracing::trace!("Received {} audio samples from web", samples.len());
pcm_buffer.extend_from_slice(&samples);
// Process 20ms chunks
while pcm_buffer.len() >= frame_size {
let chunk: Vec<f32> = pcm_buffer.drain(0..frame_size).collect();
match encoder.encode_float(&chunk, &mut opus_buffer) {
Ok(len) => {
let packet = opus_buffer[..len].to_vec();
let msg = MediaStreamMessage::AudioData {
sequence,
opus_data: packet,
};
sequence = sequence.wrapping_add(1);
// Serialize for Datagram
match postcard::to_allocvec(&msg) {
Ok(data) => {
let bytes = bytes::Bytes::from(data);
let mut sent_count = 0;
for (_i, conn) in connections.iter_mut().enumerate() {
if let Err(e) = conn.send_datagram(bytes.clone()) {
tracing::debug!("Failed to send datagram: {}", e);
} else {
sent_count += 1;
}
}
if sent_count > 0 && sequence % 50 == 0 {
tracing::info!(
"Sent audio datagram seq {} to {} peers",
sequence,
sent_count
);
}
}
Err(e) => tracing::error!("Serialization error: {}", e),
}
}
Err(e) => {
tracing::error!("Opus encode error: {:?}", e);
}
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
pcm_buffer.clear();
}
}
}
Ok(())
}