Rework audio playback to use the local device's sample rate

This commit is contained in:
Mikayla
2024-11-26 15:54:42 -08:00
parent f6bfaaba2c
commit b35e2358e4
4 changed files with 228 additions and 54 deletions

22
Cargo.lock generated
View File

@@ -986,7 +986,7 @@ checksum = "2cca750b12e02c389c1694d35c16539f88b8bbaa5945934fdc1b41a776688589"
dependencies = [
"async-native-tls",
"async-std",
"async-tls",
"async-tls 0.13.0",
"futures-io",
"futures-util",
"log",
@@ -3137,6 +3137,17 @@ dependencies = [
"coreaudio-sys",
]
[[package]]
name = "coreaudio-rs"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34ca07354f6d0640333ef95f48d460a4bcf34812a7e7967f9b44c728a8f37c28"
dependencies = [
"bitflags 1.3.2",
"core-foundation-sys",
"coreaudio-sys",
]
[[package]]
name = "coreaudio-sys"
version = "0.2.16"
@@ -3175,7 +3186,7 @@ source = "git+https://github.com/zed-industries/cpal?rev=fd8bc2fd39f1f5fdee5a069
dependencies = [
"alsa",
"core-foundation-sys",
"coreaudio-rs",
"coreaudio-rs 0.11.3",
"dasp_sample",
"jni",
"js-sys",
@@ -7109,6 +7120,7 @@ dependencies = [
"async-trait",
"collections",
"core-foundation 0.9.4",
"coreaudio-rs 0.12.1",
"cpal",
"futures 0.3.31",
"gpui",
@@ -9693,8 +9705,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes 1.8.0",
"heck 0.4.1",
"itertools 0.10.5",
"heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
"once_cell",
@@ -9727,7 +9739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.10.5",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.87",

View File

@@ -46,6 +46,7 @@ livekit.workspace = true
[target.'cfg(target_os = "macos")'.dependencies]
core-foundation.workspace = true
coreaudio-rs = "0.12.1"
[dev-dependencies]
collections = { workspace = true, features = ["test-support"] }

View File

@@ -73,7 +73,7 @@ fn main() {
cx.spawn(|cx| async move {
let mut windows = Vec::new();
for i in 0..3 {
for i in 0..2 {
let token = token::create(
&live_kit_key,
&live_kit_secret,

View File

@@ -11,7 +11,7 @@ use gpui::{
BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
};
use parking_lot::Mutex;
use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
use util::{debug_panic, ResultExt as _};
#[cfg(not(target_os = "windows"))]
use webrtc::{
@@ -35,8 +35,7 @@ pub enum AudioStream {
_tasks: [Task<Result<Option<()>>>; 2],
},
Output {
_end_on_drop: std::sync::mpsc::Sender<()>,
_receive_task: Task<()>,
_task: Task<()>,
},
}
@@ -281,74 +280,110 @@ pub fn play_remote_audio_track(
track: &track::RemoteAudioTrack,
background_executor: &BackgroundExecutor,
) -> Result<AudioStream> {
// TODO(mgsloan): use a concurrent queue that references the Cow slices the source gives us.
//
// TODO(mgsloan): put this on the channel? rationale is to not mix up samples that came from a
// different configuration.
let track = track.clone();
let mut default_change_listener = OutputDeviceChangeListener::new()?;
let (output_device, output_config) = get_default_output()?;
use std::thread;
let buffer_mutex = Arc::new(Mutex::new(Vec::<i16>::new()));
let _task = background_executor.spawn({
let background_executor = background_executor.clone();
async move {
let (mut _recieve_task, mut _thread) =
start_audio_stream(output_config, output_device, &track, &background_executor);
let device = cpal::default_host()
while let Some(_) = default_change_listener.next().await {
let Some((output_device, output_config)) = get_default_output().log_err() else {
continue;
};
if let Ok(name) = output_device.name() {
log::info!("Using speaker: {}", name)
} else {
log::info!("Using speaker: <unknown>")
}
(_recieve_task, _thread) =
start_audio_stream(output_config, output_device, &track, &background_executor);
}
futures::future::pending::<()>().await;
}
});
Ok(AudioStream::Output { _task })
}
fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
let host = cpal::default_host();
let output_device = host
.default_output_device()
.context("no audio output device available")?;
let default_config = device
.default_output_config()
.context("no default configuration available for default output device")?;
.context("failed to read default output device")?;
let output_config = output_device.default_output_config()?;
Ok((output_device, output_config))
}
fn start_audio_stream(
output_config: cpal::SupportedStreamConfig,
output_device: cpal::Device,
track: &prelude::RemoteAudioTrack,
background_executor: &BackgroundExecutor,
) -> (Task<()>, std::sync::mpsc::Sender<()>) {
let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
let sample_rate = output_config.sample_rate();
let mut stream = NativeAudioStream::new(
track.rtc_track(),
default_config.sample_rate().0 as i32,
default_config.channels() as i32,
sample_rate.0 as i32,
output_config.channels() as i32,
);
// let mut stream = NativeAudioStream::new(track.rtc_track(), 48000, 1);
let _receive_task = background_executor.spawn({
let buffer_mutex = buffer_mutex.clone();
let recieve_task = background_executor.spawn({
let buffer = buffer.clone();
async move {
const MS_OF_BUFFER: u32 = 50;
const MS_IN_SEC: u32 = 1000;
while let Some(frame) = stream.next().await {
let buffer_size = frame.samples_per_channel * frame.num_channels;
debug_assert!(frame.data.len() == buffer_size as usize);
let frame_size = frame.samples_per_channel * frame.num_channels;
debug_assert!(frame.data.len() == frame_size as usize);
let mut buffer = buffer_mutex.lock();
// TODO(mgsloan): max_size multiplier was arbitrarily chosen.
let max_size = (buffer_size * 5) as usize;
let buffer_size =
((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
let mut buffer = buffer.lock();
let new_size = buffer.len() + frame.data.len();
if new_size > max_size {
let drain_ix = new_size - max_size;
if drain_ix > buffer.len() {
buffer.clear();
} else {
buffer.drain(..new_size - max_size);
}
if new_size > buffer_size {
let overflow = new_size - buffer_size;
buffer.drain(0..overflow);
}
buffer.extend_from_slice(&frame.data);
buffer.extend(frame.data.iter());
}
}
});
let (_end_on_drop, end_on_drop_rx) = std::sync::mpsc::channel();
// The _output_stream needs to be on it's own thread because it's !Send
// and we experienced a deadlock when it's created on the main thread.
let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
thread::spawn(move || {
if cfg!(any(test, feature = "test-support")) {
// Can't play audio in tests
return;
}
let mut _output_stream = device
let mut _output_stream = output_device
.build_output_stream(
&default_config.config(),
&output_config.config(),
{
let buffer_mutex = buffer_mutex.clone();
let buffer = buffer.clone();
move |data, _info| {
let mut buffer = buffer_mutex.lock();
while data.len() > buffer.len() {
drop(buffer);
std::hint::spin_loop();
buffer = buffer_mutex.lock();
let mut buffer = buffer.lock();
if buffer.len() < data.len() {
data.fill(0);
} else {
// SAFETY: We know that buffer has at least data.len() values in it.
// because we just checked
let mut drain = buffer.drain(..data.len());
data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
}
data.copy_from_slice(&buffer[..data.len()]);
buffer.drain(..data.len());
}
},
|error| log::error!("error playing audio track: {:?}", error),
@@ -360,10 +395,7 @@ pub fn play_remote_audio_track(
end_on_drop_rx.recv().ok();
});
Ok(AudioStream::Output {
_end_on_drop,
_receive_task,
})
(recieve_task, thread)
}
#[cfg(target_os = "windows")]
@@ -419,3 +451,132 @@ fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<
fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
None as Option<Box<dyn VideoBuffer>>
}
trait DefaultOutputDeviceChangeListenerApi: Stream<Item = ()> + Sized {
fn new() -> Result<Self>;
}
#[cfg(target_os = "macos")]
mod macos {
use std::sync::Arc;
use coreaudio::sys::{
kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject, AudioObjectAddPropertyListener,
AudioObjectID, AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
};
use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
use util::defer;
use crate::DefaultOutputDeviceChangeListenerApi;
/// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
pub struct CoreAudioDefaultDeviceChangeListener {
rx: UnboundedReceiver<()>,
callback: Box<PropertyListenerCallbackWrapper>,
}
trait _AssertSend: Send {}
impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
unsafe extern "C" fn property_listener_handler_shim(
_: AudioObjectID,
_: u32,
_: *const AudioObjectPropertyAddress,
callback: *mut ::std::os::raw::c_void,
) -> OSStatus {
let wrapper = callback as *mut PropertyListenerCallbackWrapper;
(*wrapper).0();
0
}
impl DefaultOutputDeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
fn new() -> gpui::Result<Self> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let defer = Arc::new(defer(|| println!("oh no")));
let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
let _ = defer.clone();
tx.unbounded_send(()).ok();
})));
unsafe {
coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
kAudioObjectSystemObject,
&AudioObjectPropertyAddress {
mSelector: kAudioHardwarePropertyDefaultOutputDevice,
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMaster,
},
Some(property_listener_handler_shim),
&*callback as *const _ as *mut _,
))?;
}
Ok(Self { rx, callback })
}
}
impl Drop for CoreAudioDefaultDeviceChangeListener {
fn drop(&mut self) {
unsafe {
AudioObjectRemovePropertyListener(
kAudioObjectSystemObject,
&AudioObjectPropertyAddress {
mSelector: kAudioHardwarePropertyDefaultOutputDevice,
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMaster,
},
Some(property_listener_handler_shim),
&*self.callback as *const _ as *mut _,
);
}
}
}
impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
type Item = ();
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.rx.poll_next_unpin(cx)
}
}
}
#[cfg(target_os = "macos")]
type OutputDeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
#[cfg(not(target_os = "macos"))]
mod noop_change_listener {
use std::task::Poll;
use crate::DefaultOutputDeviceChangeListenerApi;
pub struct NoopOutputDeviceChangelistener {}
impl DefaultOutputDeviceChangeListenerApi for NoopOutputDeviceChangelistener {
fn new() -> anyhow::Result<Self> {
Ok(NoopOutputDeviceChangelistener {})
}
}
impl futures::Stream for NoopOutputDeviceChangelistener {
type Item = ();
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
}
#[cfg(not(target_os = "macos"))]
type OutputDeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;