Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions EventThreadingModel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
## Summary

FFI events whose handlers don't touch Unity APIs run directly on the FFI callback thread instead of being marshaled to Unity's main thread via `SynchronizationContext.Post`.

`FFICallback` previously usually routes Rust events (except `AudioStreamEvent`) through `_context.Post` to Unity's main thread. That's the safe default for handlers that touch Unity APIs (`Texture2D`, `GameObject`, `Transform`, …) but it costs one frame of latency for handlers that don't. Four categories of events can skip that:

1. **Audio stream events** that are written to the audio stream ring buffer and consumed on audio thread.
2. **One-shot async completions** that only flip `IsDone` on a `YieldInstruction` — `SetMetadata`, `UnpublishTrack`, all stream `Read/Write/Close` ops.
3. **Stream reader chunk events** that just append bytes/strings to an internal buffer.
4. **Log batches** — `UnityEngine.Debug.unityLogger` is documented thread-safe; the post hop adds latency without benefit, especially during error storms or `LK_VERBOSE` noise.

## Logic in code:

```csharp
internal static void RouteFfiEvent(FfiEvent response)
{
if (_isDisposed) return;

// 1. Per-event-type fast paths — invoke handler directly on FFI thread.
if (response.MessageCase == FfiEvent.MessageOneofCase.AudioStreamEvent) { ...; return; }
if (response.MessageCase == FfiEvent.MessageOneofCase.Logs) { ...; return; }
if (response.MessageCase == FfiEvent.MessageOneofCase.ByteStreamReaderEvent) { ...; return; }
if (response.MessageCase == FfiEvent.MessageOneofCase.TextStreamReaderEvent) { ...; return; }

// 2. One-shot completion fast path — opted-in pending callbacks complete inline.
var requestAsyncId = ExtractRequestAsyncId(response);
if (requestAsyncId.HasValue && Instance.TrySkipDispatch(requestAsyncId.Value, response))
return;

// 3. Fallback — post to Unity's main-thread sync context.
Instance._context?.Post(static (resp) =>
{
var r = resp as FfiEvent;
if (r == null) return;
DispatchEvent(r);
}, response);
}
```

## Event Table

| Event | Where it runs | Why |
| --- | --- | --- |
| `AudioStreamEvent` | **FFI thread** (unchanged) | Audio thread consumes the data; main-thread latency would hurt timing |
| `Logs` | **FFI thread** (new) | `Debug.unityLogger` is thread-safe; logs reach console immediately during panics / errors |
| `ByteStreamReaderEvent` | **FFI thread** (new) | Internal buffer is now lock-protected; chunks land without frame delay |
| `TextStreamReaderEvent` | **FFI thread** (new) | Same lock as byte path (shared `ReadIncrementalInstructionBase`) |
| One-shot completions via `FfiInstruction<T>` | **FFI thread** (new) | `SetLocalMetadata`, `SetLocalName`, `SetLocalAttributes`, `UnpublishTrack` — only flip `IsDone`/`IsError` |
| One-shot completions via `FfiStreamInstruction<T>` | **FFI thread** (new) | `ByteStreamWriter.Write/Close`, `TextStreamWriter.Write/Close` |
| One-shot completions via `FfiStreamResultInstruction<T,U>` | **FFI thread** (new) | `ByteStreamReader.ReadAll/WriteToFile`, `TextStreamReader.ReadAll` |
| `RoomEvent` | Main thread | Fires user-facing `ParticipantConnected`, `TrackPublished`, etc. |
| `TrackEvent` | Main thread | (No subscribers today; main-thread default for safety) |
| `RpcMethodInvocation` | Main thread | User RPC handlers commonly touch game state |
| `Disconnect` | Main thread | UI updates typical |
| `VideoStreamEvent` | Main thread | Internal buffering is fast; user-facing raw delivery deferred (see follow-ups) |
| `DataTrackStreamEvent` | Main thread | Deferred until a concrete consumer asks |
| `Connect` (one-shot) | Main thread | Bespoke handler fires participant-connected events |
| `PublishTrack` (one-shot) | Main thread | Bespoke handler |
| `GetStats` (one-shot) | Main thread | Bespoke handler |
| `CaptureAudioFrame` (one-shot) | Main thread | Bespoke handler |
| `PerformRpc` (one-shot) | Main thread | Bespoke handler surfaces response |
| `SendText` / `SendFile` (one-shot) | Main thread | Bespoke handlers |
| `TextStreamOpen` / `ByteStreamOpen` (one-shot) | Main thread | Bespoke handlers return writer objects |
| `PublishDataTrack` (one-shot) | Main thread | Bespoke handler |
7 changes: 7 additions & 0 deletions EventThreadingModel.md.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Runtime/Plugins/Google.Protobuf.dll.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-ios-arm64/liblivekit_ffi.a
Git LFS file not shown
4 changes: 2 additions & 2 deletions Runtime/Plugins/ffi-macos-arm64/liblivekit_ffi.dylib
Git LFS file not shown
8 changes: 8 additions & 0 deletions Runtime/Plugins/iOS.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions Runtime/Plugins/iOS/LiveKitAudioSession.mm
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2024 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#import <AVFoundation/AVFoundation.h>

extern "C" {

/// Configures the iOS audio session for VoIP/WebRTC use.
/// This sets AVAudioSessionCategoryPlayAndRecord with VoiceChat mode,
/// which enables the VPIO (Voice Processing IO) AudioUnit for:
/// - Hardware echo cancellation (AEC)
/// - Automatic gain control (AGC)
/// - Noise suppression (NS)
///
/// Call this before creating PlatformAudio to ensure WebRTC can
/// properly initialize the microphone and speaker.
void LiveKit_ConfigureAudioSessionForVoIP() {
AVAudioSession* session = [AVAudioSession sharedInstance];
NSError* error = nil;

// Configure for VoIP with echo cancellation
BOOL success = [session setCategory:AVAudioSessionCategoryPlayAndRecord
mode:AVAudioSessionModeVoiceChat
options:AVAudioSessionCategoryOptionDefaultToSpeaker |
AVAudioSessionCategoryOptionAllowBluetooth |
AVAudioSessionCategoryOptionAllowBluetoothA2DP
error:&error];

if (!success || error) {
NSLog(@"LiveKit: Failed to configure VoIP audio session: %@", error.localizedDescription);
return;
}

// Activate the audio session
success = [session setActive:YES error:&error];
if (!success || error) {
NSLog(@"LiveKit: Failed to activate audio session: %@", error.localizedDescription);
return;
}

NSLog(@"LiveKit: Audio session configured for VoIP (PlayAndRecord + VoiceChat mode)");
}

/// Restores the audio session to the default ambient category.
/// Call this when PlatformAudio is disposed if you want to restore
/// the original audio behavior.
void LiveKit_RestoreDefaultAudioSession() {
AVAudioSession* session = [AVAudioSession sharedInstance];
NSError* error = nil;

[session setCategory:AVAudioSessionCategoryAmbient error:&error];
if (error) {
NSLog(@"LiveKit: Failed to restore default audio session: %@", error.localizedDescription);
}
}

}
42 changes: 42 additions & 0 deletions Runtime/Plugins/iOS/LiveKitAudioSession.mm.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Runtime/Plugins/iOS/liblivekit_ffi.a
Git LFS file not shown
27 changes: 27 additions & 0 deletions Runtime/Plugins/iOS/liblivekit_ffi.a.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 39 additions & 17 deletions Runtime/Scripts/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
private readonly Queue<TContent> _pendingChunks = new();
private TContent _latestChunk;

// Chunk events arrive on the FFI thread; Reset() and the LatestChunk getter
// run on the main-thread coroutine. _gate serializes mutations of the queue,
// _latestChunk, IsCurrentReadDone, IsEos, and Error across both sides.
private readonly object _gate = new();

/// <summary>
/// Error that occurred on the last read, if any.
/// </summary>
Expand All @@ -94,8 +99,11 @@ protected TContent LatestChunk
{
get
{
if (Error != null) throw Error;
return _latestChunk;
lock (_gate)
{
if (Error != null) throw Error;
return _latestChunk;
}
}
}

Expand All @@ -108,34 +116,48 @@ protected ReadIncrementalInstructionBase(FfiHandle readerHandle)

protected void OnChunk(TContent content)
{
if (IsCurrentReadDone)
{
// Consumer hasn't yielded since the last chunk; buffer until Reset().
_pendingChunks.Enqueue(content);
}
else
lock (_gate)
{
_latestChunk = content;
IsCurrentReadDone = true;
if (IsCurrentReadDone)
{
// Consumer hasn't yielded since the last chunk; buffer until Reset().
_pendingChunks.Enqueue(content);
}
else
{
_latestChunk = content;
IsCurrentReadDone = true;
}
}
}

public override void Reset()
{
base.Reset();
if (_pendingChunks.Count > 0)
// base.Reset() must run under the same lock as OnChunk, otherwise the
// window between IsCurrentReadDone=false (from base) and the dequeue
// below lets a producer race in, write _latestChunk, and have its
// chunk immediately overwritten by the dequeue. That race lost ~4% of
// chunks under stress before this fix.
lock (_gate)
{
_latestChunk = _pendingChunks.Dequeue();
IsCurrentReadDone = true;
base.Reset();
if (_pendingChunks.Count > 0)
{
_latestChunk = _pendingChunks.Dequeue();
IsCurrentReadDone = true;
}
}
}

protected void OnEos(Proto.StreamError protoError)
{
IsEos = true;
if (protoError != null)
lock (_gate)
{
Error = new StreamError(protoError);
IsEos = true;
if (protoError != null)
{
Error = new StreamError(protoError);
}
}
}
}
Expand Down
Loading
Loading