From b35e2358e4008f03eeae60b08cbef68fbb46bc89 Mon Sep 17 00:00:00 2001 From: Mikayla Date: Tue, 26 Nov 2024 15:54:42 -0800 Subject: [PATCH] Rework audio playback to use the local device's sample rate --- Cargo.lock | 22 +- crates/live_kit_client/Cargo.toml | 1 + crates/live_kit_client/examples/test_app.rs | 2 +- crates/live_kit_client/src/live_kit_client.rs | 257 ++++++++++++++---- 4 files changed, 228 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f578fd021..f4331628fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -986,7 +986,7 @@ checksum = "2cca750b12e02c389c1694d35c16539f88b8bbaa5945934fdc1b41a776688589" dependencies = [ "async-native-tls", "async-std", - "async-tls", + "async-tls 0.13.0", "futures-io", "futures-util", "log", @@ -3137,6 +3137,17 @@ dependencies = [ "coreaudio-sys", ] +[[package]] +name = "coreaudio-rs" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ca07354f6d0640333ef95f48d460a4bcf34812a7e7967f9b44c728a8f37c28" +dependencies = [ + "bitflags 1.3.2", + "core-foundation-sys", + "coreaudio-sys", +] + [[package]] name = "coreaudio-sys" version = "0.2.16" @@ -3175,7 +3186,7 @@ source = "git+https://github.com/zed-industries/cpal?rev=fd8bc2fd39f1f5fdee5a069 dependencies = [ "alsa", "core-foundation-sys", - "coreaudio-rs", + "coreaudio-rs 0.11.3", "dasp_sample", "jni", "js-sys", @@ -7109,6 +7120,7 @@ dependencies = [ "async-trait", "collections", "core-foundation 0.9.4", + "coreaudio-rs 0.12.1", "cpal", "futures 0.3.31", "gpui", @@ -9693,8 +9705,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes 1.8.0", - "heck 0.4.1", - "itertools 0.10.5", + "heck 0.5.0", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -9727,7 +9739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.87", diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index 921c048f23..46454170ba 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -46,6 +46,7 @@ livekit.workspace = true [target.'cfg(target_os = "macos")'.dependencies] core-foundation.workspace = true +coreaudio-rs = "0.12.1" [dev-dependencies] collections = { workspace = true, features = ["test-support"] } diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index 1ffc407fad..9e2a449c52 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -73,7 +73,7 @@ fn main() { cx.spawn(|cx| async move { let mut windows = Vec::new(); - for i in 0..3 { + for i in 0..2 { let token = token::create( &live_kit_key, &live_kit_secret, diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index c5e51def20..28e9fe7fa9 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -11,7 +11,7 @@ use gpui::{ BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task, }; use parking_lot::Mutex; -use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc}; +use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread}; use util::{debug_panic, ResultExt as _}; #[cfg(not(target_os = "windows"))] use webrtc::{ @@ -35,8 +35,7 @@ pub enum AudioStream { _tasks: [Task>>; 2], }, Output { - _end_on_drop: std::sync::mpsc::Sender<()>, - _receive_task: Task<()>, + _task: Task<()>, }, } @@ -281,74 +280,110 @@ pub fn play_remote_audio_track( track: &track::RemoteAudioTrack, background_executor: &BackgroundExecutor, ) -> Result { - // TODO(mgsloan): use a concurrent queue that references the Cow slices the source gives us. - // - // TODO(mgsloan): put this on the channel? rationale is to not mix up samples that came from a - // different configuration. + let track = track.clone(); + let mut default_change_listener = OutputDeviceChangeListener::new()?; + let (output_device, output_config) = get_default_output()?; - use std::thread; - let buffer_mutex = Arc::new(Mutex::new(Vec::::new())); + let _task = background_executor.spawn({ + let background_executor = background_executor.clone(); + async move { + let (mut _recieve_task, mut _thread) = + start_audio_stream(output_config, output_device, &track, &background_executor); - let device = cpal::default_host() + while let Some(_) = default_change_listener.next().await { + let Some((output_device, output_config)) = get_default_output().log_err() else { + continue; + }; + + if let Ok(name) = output_device.name() { + log::info!("Using speaker: {}", name) + } else { + log::info!("Using speaker: ") + } + + (_recieve_task, _thread) = + start_audio_stream(output_config, output_device, &track, &background_executor); + } + + futures::future::pending::<()>().await; + } + }); + + Ok(AudioStream::Output { _task }) +} + +fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> { + let host = cpal::default_host(); + let output_device = host .default_output_device() - .context("no audio output device available")?; - let default_config = device - .default_output_config() - .context("no default configuration available for default output device")?; + .context("failed to read default output device")?; + let output_config = output_device.default_output_config()?; + Ok((output_device, output_config)) +} + +fn start_audio_stream( + output_config: cpal::SupportedStreamConfig, + output_device: cpal::Device, + track: &prelude::RemoteAudioTrack, + background_executor: &BackgroundExecutor, +) -> (Task<()>, std::sync::mpsc::Sender<()>) { + let buffer = Arc::new(Mutex::new(VecDeque::::new())); + let sample_rate = output_config.sample_rate(); let mut stream = NativeAudioStream::new( track.rtc_track(), - default_config.sample_rate().0 as i32, - default_config.channels() as i32, + sample_rate.0 as i32, + output_config.channels() as i32, ); - // let mut stream = NativeAudioStream::new(track.rtc_track(), 48000, 1); - - let _receive_task = background_executor.spawn({ - let buffer_mutex = buffer_mutex.clone(); + let recieve_task = background_executor.spawn({ + let buffer = buffer.clone(); async move { + const MS_OF_BUFFER: u32 = 50; + const MS_IN_SEC: u32 = 1000; while let Some(frame) = stream.next().await { - let buffer_size = frame.samples_per_channel * frame.num_channels; - debug_assert!(frame.data.len() == buffer_size as usize); + let frame_size = frame.samples_per_channel * frame.num_channels; + debug_assert!(frame.data.len() == frame_size as usize); - let mut buffer = buffer_mutex.lock(); - // TODO(mgsloan): max_size multiplier was arbitrarily chosen. - let max_size = (buffer_size * 5) as usize; + let buffer_size = + ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize; + + let mut buffer = buffer.lock(); let new_size = buffer.len() + frame.data.len(); - if new_size > max_size { - let drain_ix = new_size - max_size; - if drain_ix > buffer.len() { - buffer.clear(); - } else { - buffer.drain(..new_size - max_size); - } + if new_size > buffer_size { + let overflow = new_size - buffer_size; + buffer.drain(0..overflow); } - buffer.extend_from_slice(&frame.data); + + buffer.extend(frame.data.iter()); } } }); - let (_end_on_drop, end_on_drop_rx) = std::sync::mpsc::channel(); + // The _output_stream needs to be on it's own thread because it's !Send + // and we experienced a deadlock when it's created on the main thread. + let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>(); thread::spawn(move || { if cfg!(any(test, feature = "test-support")) { // Can't play audio in tests return; } - let mut _output_stream = device + let mut _output_stream = output_device .build_output_stream( - &default_config.config(), + &output_config.config(), { - let buffer_mutex = buffer_mutex.clone(); + let buffer = buffer.clone(); move |data, _info| { - let mut buffer = buffer_mutex.lock(); - while data.len() > buffer.len() { - drop(buffer); - std::hint::spin_loop(); - buffer = buffer_mutex.lock(); + let mut buffer = buffer.lock(); + if buffer.len() < data.len() { + data.fill(0); + } else { + // SAFETY: We know that buffer has at least data.len() values in it. + // because we just checked + let mut drain = buffer.drain(..data.len()); + data.fill_with(|| unsafe { drain.next().unwrap_unchecked() }); } - data.copy_from_slice(&buffer[..data.len()]); - buffer.drain(..data.len()); } }, |error| log::error!("error playing audio track: {:?}", error), @@ -360,10 +395,7 @@ pub fn play_remote_audio_track( end_on_drop_rx.recv().ok(); }); - Ok(AudioStream::Output { - _end_on_drop, - _receive_task, - }) + (recieve_task, thread) } #[cfg(target_os = "windows")] @@ -419,3 +451,132 @@ fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option Option> { None as Option> } + +trait DefaultOutputDeviceChangeListenerApi: Stream + Sized { + fn new() -> Result; +} + +#[cfg(target_os = "macos")] +mod macos { + + use std::sync::Arc; + + use coreaudio::sys::{ + kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster, + kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject, AudioObjectAddPropertyListener, + AudioObjectID, AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus, + }; + use futures::{channel::mpsc::UnboundedReceiver, StreamExt}; + use util::defer; + + use crate::DefaultOutputDeviceChangeListenerApi; + + /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15 + pub struct CoreAudioDefaultDeviceChangeListener { + rx: UnboundedReceiver<()>, + callback: Box, + } + + trait _AssertSend: Send {} + impl _AssertSend for CoreAudioDefaultDeviceChangeListener {} + + struct PropertyListenerCallbackWrapper(Box); + + unsafe extern "C" fn property_listener_handler_shim( + _: AudioObjectID, + _: u32, + _: *const AudioObjectPropertyAddress, + callback: *mut ::std::os::raw::c_void, + ) -> OSStatus { + let wrapper = callback as *mut PropertyListenerCallbackWrapper; + (*wrapper).0(); + 0 + } + + impl DefaultOutputDeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener { + fn new() -> gpui::Result { + let (tx, rx) = futures::channel::mpsc::unbounded(); + + let defer = Arc::new(defer(|| println!("oh no"))); + let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || { + let _ = defer.clone(); + tx.unbounded_send(()).ok(); + }))); + + unsafe { + coreaudio::Error::from_os_status(AudioObjectAddPropertyListener( + kAudioObjectSystemObject, + &AudioObjectPropertyAddress { + mSelector: kAudioHardwarePropertyDefaultOutputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMaster, + }, + Some(property_listener_handler_shim), + &*callback as *const _ as *mut _, + ))?; + } + + Ok(Self { rx, callback }) + } + } + + impl Drop for CoreAudioDefaultDeviceChangeListener { + fn drop(&mut self) { + unsafe { + AudioObjectRemovePropertyListener( + kAudioObjectSystemObject, + &AudioObjectPropertyAddress { + mSelector: kAudioHardwarePropertyDefaultOutputDevice, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMaster, + }, + Some(property_listener_handler_shim), + &*self.callback as *const _ as *mut _, + ); + } + } + } + + impl futures::Stream for CoreAudioDefaultDeviceChangeListener { + type Item = (); + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.rx.poll_next_unpin(cx) + } + } +} + +#[cfg(target_os = "macos")] +type OutputDeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener; + +#[cfg(not(target_os = "macos"))] +mod noop_change_listener { + use std::task::Poll; + + use crate::DefaultOutputDeviceChangeListenerApi; + + pub struct NoopOutputDeviceChangelistener {} + + impl DefaultOutputDeviceChangeListenerApi for NoopOutputDeviceChangelistener { + fn new() -> anyhow::Result { + Ok(NoopOutputDeviceChangelistener {}) + } + } + + impl futures::Stream for NoopOutputDeviceChangelistener { + type Item = (); + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Pending + } + } +} + +#[cfg(not(target_os = "macos"))] +type OutputDeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;