Skip to content
Open
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `DeviceBusy` error variant to `SupportedStreamConfigsError`, `DefaultStreamConfigError`, and
`BuildStreamError` for retryable device access errors (EBUSY, EAGAIN).
- `StreamConfig` now implements `Copy`.
- `StreamTrait::buffer_size` method to query the callback buffer size.
- **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API.
- **PipeWire**: New host for Linux and some BSDs using the PipeWire API.

Expand All @@ -25,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **AAudio**: `supported_input_configs` and `supported_output_configs` now return an error for
direction-mismatched devices (e.g. querying input configs on an output-only device) instead of
silently returning an empty list.
- **AAudio**: Buffers with default sizes are now dynamically tuned.
- **ASIO**: `Device::driver`, `asio_streams`, and `current_callback_flag` are no longer `pub`.

### Fixed
Expand Down
22 changes: 21 additions & 1 deletion src/host/aaudio/java_interface/audio_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
utils::{
get_context, get_property, get_system_service, with_attached, JNIEnv, JObject, JResult,
get_context, get_property, get_system_property, get_system_service, with_attached, JNIEnv,
JObject, JResult,
},
AudioManager, Context,
};
Expand All @@ -13,6 +14,14 @@ impl AudioManager {
with_attached(context, |env, context| get_frames_per_buffer(env, &context))
.map_err(|error| error.to_string())
}

/// Get the AAudio mixer burst count from system property
pub fn get_mixer_bursts() -> Result<i32, String> {
let context = get_context();

with_attached(context, |env, _context| get_mixer_bursts(env))
.map_err(|error| error.to_string())
}
}

fn get_frames_per_buffer<'j>(env: &mut JNIEnv<'j>, context: &JObject<'j>) -> JResult<i32> {
Expand All @@ -31,3 +40,14 @@ fn get_frames_per_buffer<'j>(env: &mut JNIEnv<'j>, context: &JObject<'j>) -> JRe
.parse::<i32>()
.map_err(|_| jni::errors::Error::JniCall(jni::errors::JniError::Unknown))
}

fn get_mixer_bursts<'j>(env: &mut JNIEnv<'j>) -> JResult<i32> {
let mixer_bursts = get_system_property(env, "aaudio.mixer_bursts", "2")?;

let mixer_bursts_string = String::from(env.get_string(&mixer_bursts)?);

// TODO: Use jni::errors::Error::ParseFailed instead of jni::errors::Error::JniCall once jni > v0.21.1 is released
mixer_bursts_string
.parse::<i32>()
.map_err(|_| jni::errors::Error::JniCall(jni::errors::JniError::Unknown))
}
20 changes: 20 additions & 0 deletions src/host/aaudio/java_interface/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ pub fn get_property<'j>(
call_method_string_arg_ret_string(env, subject, "getProperty", name)
}

/// Read an Android system property
pub fn get_system_property<'j>(
env: &mut JNIEnv<'j>,
name: &str,
default_value: &str,
) -> JResult<JString<'j>> {
Ok(env
.call_static_method(
"android/os/SystemProperties",
"get",
"(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;",
&[
(&env.new_string(name)?).into(),
(&env.new_string(default_value)?).into(),
],
)?
.l()?
.into())
}

pub fn get_devices<'j>(
env: &mut JNIEnv<'j>,
subject: &JObject<'j>,
Expand Down
149 changes: 119 additions & 30 deletions src/host/aaudio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::cmp;
use std::convert::TryInto;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter;
Expand Down Expand Up @@ -131,9 +132,9 @@ pub struct Device(Option<AudioDeviceInfo>);
/// - The pointer in AudioStream (NonNull<AAudioStreamStruct>) is valid for the lifetime
/// of the stream and AAudio C API functions are thread-safe at the C level
#[derive(Clone)]
pub enum Stream {
Input(Arc<Mutex<AudioStream>>),
Output(Arc<Mutex<AudioStream>>),
pub struct Stream {
inner: Arc<Mutex<AudioStream>>,
direction: DeviceDirection,
}

// SAFETY: AudioStream can be safely sent between threads. The AAudio C API is thread-safe
Expand All @@ -148,6 +149,14 @@ unsafe impl Sync for Stream {}
crate::assert_stream_send!(Stream);
crate::assert_stream_sync!(Stream);

/// State for dynamic buffer tuning on output streams.
#[derive(Default)]
struct BufferTuningState {
previous_underrun_count: AtomicI32,
capacity: AtomicI32,
mixer_bursts: AtomicI32,
}

pub use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs};
pub type Devices = std::vec::IntoIter<Device>;

Expand Down Expand Up @@ -277,14 +286,16 @@ fn configure_for_device(
};
builder = builder.sample_rate(config.sample_rate.try_into().unwrap());

// Note: Buffer size validation is not needed - the native AAudio API validates buffer sizes
// when `open_stream()` is called.
match &config.buffer_size {
BufferSize::Default => builder,
BufferSize::Fixed(size) => builder
.frames_per_data_callback(*size as i32)
.buffer_capacity_in_frames((*size * 2) as i32), // Double-buffering
// Following the pattern from Oboe and Google's AAudio, we let AAudio choose the optimal
// callback size dynamically by default. See
// - https://developer.android.com/ndk/reference/group/audio#aaudiostreambuilder_setframesperdatacallback
// - https://developer.android.com/ndk/guides/audio/audio-latency#buffer-size
if let BufferSize::Fixed(size) = config.buffer_size {
// Only for fixed sizes, the user explicitly wants control over the callback size.
builder = builder.frames_per_data_callback(size as i32);
}

builder
}

fn build_input_stream<D, E>(
Expand Down Expand Up @@ -326,11 +337,15 @@ where
(error_callback)(StreamError::from(error))
}))
.open_stream()?;

// SAFETY: Stream implements Send + Sync (see unsafe impl below). Arc<Mutex<AudioStream>>
// is safe because the Mutex provides exclusive access and AudioStream's thread safety
// is documented in the AAudio C API.
#[allow(clippy::arc_with_non_send_sync)]
Ok(Stream::Input(Arc::new(Mutex::new(stream))))
Ok(Stream {
inner: Arc::new(Mutex::new(stream)),
direction: DeviceDirection::Input,
})
}

fn build_output_stream<D, E>(
Expand All @@ -348,8 +363,14 @@ where
let builder = configure_for_device(builder, device, config);
let created = Instant::now();
let channel_count = config.channels as i32;
let tune_dynamically = config.buffer_size == BufferSize::Default;

let tuning = Arc::new(BufferTuningState::default());
let tuning_for_callback = tuning.clone();

let stream = builder
.data_callback(Box::new(move |stream, data, num_frames| {
// Deliver audio data to user callback
let cb_info = OutputCallbackInfo {
timestamp: OutputStreamTimestamp {
callback: to_stream_instant(created.elapsed()),
Expand All @@ -366,17 +387,79 @@ where
},
&cb_info,
);

// Dynamic buffer tuning for output streams
// See: https://developer.android.com/ndk/guides/audio/aaudio/aaudio#tuning-buffers
if tune_dynamically {
let underrun_count = stream.x_run_count();
let previous = tuning_for_callback
.previous_underrun_count
.load(Ordering::Relaxed);

if underrun_count > previous {
// The number of frames per burst can vary dynamically
let mut burst_size = stream.frames_per_burst();
if burst_size <= 0 {
burst_size = 256; // fallback from AAudio documentation
} else if burst_size < 16 {
burst_size = 16; // floor from Oboe
}

let new_mixer_bursts = tuning_for_callback
.mixer_bursts
.load(Ordering::Relaxed)
.saturating_add(1);
let mut buffer_size = burst_size * new_mixer_bursts;

let buffer_capacity = tuning_for_callback.capacity.load(Ordering::Relaxed);
if buffer_size > buffer_capacity {
buffer_size = buffer_capacity;
}

if stream.set_buffer_size_in_frames(buffer_size).is_ok() {
tuning_for_callback
.mixer_bursts
.store(new_mixer_bursts, Ordering::Relaxed);
}

tuning_for_callback
.previous_underrun_count
.store(underrun_count, Ordering::Relaxed);
}
}

ndk::audio::AudioCallbackResult::Continue
}))
.error_callback(Box::new(move |_stream, error| {
(error_callback)(StreamError::from(error))
}))
.open_stream()?;

// After stream opens, query and cache the values
let capacity = stream.buffer_capacity_in_frames();
tuning.capacity.store(capacity, Ordering::Relaxed);

let mixer_bursts = match AudioManager::get_mixer_bursts() {
Ok(bursts) => bursts,
Err(_) => {
let burst_size = stream.frames_per_burst();
if burst_size > 0 {
stream.buffer_size_in_frames() / burst_size
} else {
0 // defer to dynamic tuning
}
}
};
tuning.mixer_bursts.store(mixer_bursts, Ordering::Relaxed);

// SAFETY: Stream implements Send + Sync (see unsafe impl below). Arc<Mutex<AudioStream>>
// is safe because the Mutex provides exclusive access and AudioStream's thread safety
// is documented in the AAudio C API.
#[allow(clippy::arc_with_non_send_sync)]
Ok(Stream::Output(Arc::new(Mutex::new(stream))))
Ok(Stream {
inner: Arc::new(Mutex::new(stream)),
direction: DeviceDirection::Output,
})
}

impl DeviceTrait for Device {
Expand Down Expand Up @@ -598,31 +681,37 @@ impl DeviceTrait for Device {

impl StreamTrait for Stream {
fn play(&self) -> Result<(), PlayStreamError> {
match self {
Self::Input(stream) => stream
.lock()
.unwrap()
.request_start()
.map_err(PlayStreamError::from),
Self::Output(stream) => stream
.lock()
.unwrap()
.request_start()
.map_err(PlayStreamError::from),
}
self.inner
.lock()
.unwrap()
.request_start()
.map_err(PlayStreamError::from)
}

fn pause(&self) -> Result<(), PauseStreamError> {
match self {
Self::Input(_) => Err(BackendSpecificError {
description: "Pause called on the input stream.".to_owned(),
}
.into()),
Self::Output(stream) => stream
match self.direction {
DeviceDirection::Output => self
.inner
.lock()
.unwrap()
.request_pause()
.map_err(PauseStreamError::from),
_ => Err(BackendSpecificError {
description: "Pause only supported on output streams.".to_owned(),
}
.into()),
}
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
let stream = self.inner.lock().ok()?;

// If frames_per_data_callback was not explicitly set (returning 0),
// fall back to the burst size as that's what AAudio uses by default.
let frames = match stream.frames_per_data_callback() {
Some(size) if size > 0 => size,
_ => stream.frames_per_burst(),
};
Some(frames as crate::FrameCount)
}
}
3 changes: 3 additions & 0 deletions src/host/alsa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,9 @@ impl StreamTrait for Stream {
self.inner.channel.pause(true).ok();
Ok(())
}
fn buffer_size(&self) -> Option<FrameCount> {
Some(self.inner.period_frames as FrameCount)
}
}

// Convert ALSA frames to FrameCount, clamping to valid range.
Expand Down
4 changes: 4 additions & 0 deletions src/host/asio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,8 @@ impl StreamTrait for Stream {
fn pause(&self) -> Result<(), PauseStreamError> {
Stream::pause(self)
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
Stream::buffer_size(self)
}
}
9 changes: 9 additions & 0 deletions src/host/asio/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ impl Stream {
self.playing.store(false, Ordering::SeqCst);
Ok(())
}

pub fn buffer_size(&self) -> Option<crate::FrameCount> {
let streams = self.asio_streams.lock().ok()?;
streams
.output
.as_ref()
.or(streams.input.as_ref())
.map(|s| s.buffer_size as crate::FrameCount)
}
}

impl Device {
Expand Down
5 changes: 4 additions & 1 deletion src/host/coreaudio/ios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,14 @@ impl StreamTrait for Stream {
let err = BackendSpecificError { description };
return Err(err.into());
}

stream.playing = false;
}
Ok(())
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
Some(get_device_buffer_frames() as crate::FrameCount)
}
}

struct StreamInner {
Expand Down
4 changes: 3 additions & 1 deletion src/host/coreaudio/macos/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,9 @@ fn setup_callback_vars(
///
/// Buffer frame size is a device-level property that always uses Scope::Global + Element::Output,
/// regardless of whether the audio unit is configured for input or output streams.
fn get_device_buffer_frame_size(audio_unit: &AudioUnit) -> Result<usize, coreaudio::Error> {
pub(crate) fn get_device_buffer_frame_size(
audio_unit: &AudioUnit,
) -> Result<usize, coreaudio::Error> {
// Device-level property: always use Scope::Global + Element::Output
// This is consistent with how we set the buffer size and query the buffer size range
let frames: u32 = audio_unit.get_property(
Expand Down
8 changes: 8 additions & 0 deletions src/host/coreaudio/macos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ impl StreamTrait for Stream {

stream.pause()
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
let stream = self.inner.lock().ok()?;

device::get_device_buffer_frame_size(&stream.audio_unit)
.ok()
.map(|size| size as crate::FrameCount)
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/host/jack/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ impl StreamTrait for Stream {
self.playing.store(false, Ordering::SeqCst);
Ok(())
}

fn buffer_size(&self) -> Option<crate::FrameCount> {
Some(self.async_client.as_client().buffer_size() as crate::FrameCount)
}
}

type InputDataCallback = Box<dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static>;
Expand Down
Loading