Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker-validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ permissions:

env:
# Pinned commit for cpp-example-collection smoke build (https://github.com/livekit-examples/cpp-example-collection)
CPP_EXAMPLE_COLLECTION_REF: f231c0c75028d1dcf13edcecd369d030d2c7c8d4
CPP_EXAMPLE_COLLECTION_REF: 46083ea4e5d3def8e44a53148c2c7800131efca0
Comment thread
stephen-derosa marked this conversation as resolved.

jobs:
validate-x64:
Expand Down
6 changes: 2 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ jobs:
- os: macos-26-xlarge
name: macos-arm64
build_cmd: ./build.sh release-tests
# E2E not possible on GHA Mac runner currently
e2e-testing: false
e2e-testing: true
Comment thread
stephen-derosa marked this conversation as resolved.
- os: macos-26-large
name: macos-x64
build_cmd: ./build.sh release-tests --macos-arch x86_64
# E2E not possible on GHA Mac runner currently
e2e-testing: false
e2e-testing: true
# Pinned to Windows 2022 for current VS 17 implementation
- os: windows-2022
name: windows-x64
Expand Down
2 changes: 1 addition & 1 deletion cpp-example-collection
Submodule cpp-example-collection updated 54 files
+8 −8 CMakeLists.txt
+23 −29 basic_room/capture_utils.cpp
+2 −5 basic_room/capture_utils.h
+21 −36 basic_room/main.cpp
+0 −0 hello_livekit/receiver/CMakeLists.txt
+24 −23 hello_livekit/receiver/main.cpp
+0 −0 hello_livekit/sender/CMakeLists.txt
+23 −21 hello_livekit/sender/main.cpp
+0 −0 logging_levels/basic_usage/CMakeLists.txt
+33 −48 logging_levels/basic_usage/main.cpp
+0 −0 logging_levels/custom_sinks/CMakeLists.txt
+72 −83 logging_levels/custom_sinks/main.cpp
+0 −0 ping_pong/ping/CMakeLists.txt
+0 −0 ping_pong/ping/constants.h
+10 −13 ping_pong/ping/json_converters.cpp
+6 −6 ping_pong/ping/json_converters.h
+35 −49 ping_pong/ping/main.cpp
+0 −0 ping_pong/ping/messages.h
+4 −4 ping_pong/ping/utils.h
+0 −0 ping_pong/pong/CMakeLists.txt
+0 −0 ping_pong/pong/constants.h
+10 −13 ping_pong/pong/json_converters.cpp
+6 −6 ping_pong/pong/json_converters.h
+24 −30 ping_pong/pong/main.cpp
+0 −0 ping_pong/pong/messages.h
+4 −4 ping_pong/pong/utils.h
+42 −67 simple_data_stream/main.cpp
+0 −0 simple_joystick/receiver/CMakeLists.txt
+4 −5 simple_joystick/receiver/json_utils.cpp
+2 −2 simple_joystick/receiver/json_utils.h
+17 −21 simple_joystick/receiver/main.cpp
+11 −18 simple_joystick/receiver/utils.cpp
+1 −1 simple_joystick/receiver/utils.h
+0 −0 simple_joystick/sender/CMakeLists.txt
+4 −5 simple_joystick/sender/json_utils.cpp
+2 −2 simple_joystick/sender/json_utils.h
+47 −57 simple_joystick/sender/main.cpp
+11 −18 simple_joystick/sender/utils.cpp
+1 −1 simple_joystick/sender/utils.h
+22 −29 simple_room/fallback_capture.cpp
+2 −5 simple_room/fallback_capture.h
+52 −83 simple_room/main.cpp
+21 −29 simple_room/sdl_media.cpp
+13 −17 simple_room/sdl_media.h
+49 −66 simple_room/sdl_media_manager.cpp
+9 −9 simple_room/sdl_media_manager.h
+15 −19 simple_room/sdl_video_renderer.cpp
+5 −4 simple_room/sdl_video_renderer.h
+8 −14 simple_room/wav_audio_source.cpp
+5 −5 simple_room/wav_audio_source.h
+111 −164 simple_rpc/main.cpp
+5 −6 user_timestamped_video/common/cli_utils.h
+25 −40 user_timestamped_video/consumer/main.cpp
+16 −28 user_timestamped_video/producer/main.cpp
143 changes: 45 additions & 98 deletions src/tests/integration/test_data_track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
#include <livekit/e2ee.h>
#include <livekit/remote_data_track.h>

#include <cmath>
#include <condition_variable>
#include <exception>
#include <future>
#include <tuple>

#include "../common/test_common.h"
#include "ffi_client.h"
#include "lk_log.h"

namespace livekit::test {

Expand All @@ -45,9 +42,12 @@ constexpr char kTrackNamePrefix[] = "data_track_e2e";
constexpr auto kTrackWaitTimeout = 10s;
constexpr auto kPollingInterval = 10ms;
constexpr int kResubscribeIterations = 10;
constexpr std::size_t kSinglePacketPayloadBytes = 8192;
constexpr int kPublishManyTrackCount = 256;
constexpr auto kPublishManyTimeout = 5s;
constexpr std::size_t kLargeFramePayloadBytes = 196608;
constexpr auto kTransportFrameTimeout = 15s;
constexpr std::uint8_t kTransportPayloadValue = 0xFA;
constexpr char kE2EESharedSecret[] = "password";
constexpr int kE2EEFrameCount = 5;
constexpr int kTimestampFrameAttempts = 200;
Expand Down Expand Up @@ -287,24 +287,15 @@ void runEncryptedDataTrackRoundTrip(KeyDerivationFunction key_derivation_functio

class DataTrackE2ETest : public LiveKitTestBase {};

class DataTrackTransportTest : public DataTrackE2ETest,
public ::testing::WithParamInterface<std::tuple<double, size_t>> {};
class DataTrackTransportTest : public DataTrackE2ETest, public ::testing::WithParamInterface<std::size_t> {};

class DataTrackKeyDerivationTest : public DataTrackE2ETest,
public ::testing::WithParamInterface<KeyDerivationFunction> {};

TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) {
Comment thread
stephen-derosa marked this conversation as resolved.
const auto publish_fps = std::get<0>(GetParam());
const auto payload_len = std::get<1>(GetParam());
const auto payload_len = GetParam();
const auto track_name = makeTrackName("transport");

// How long to publish frames for.
constexpr auto PUBLISH_DURATION = 10s;

// Percentage of total frames that must be received on the subscriber end in
// order for the test to pass.
constexpr float MIN_PERCENTAGE = 0.90f;

std::vector<TestRoomConnectionOptions> room_configs(2);
room_configs[0].room_options.single_peer_connection = false;
room_configs[1].room_options.single_peer_connection = false;
Expand All @@ -316,104 +307,57 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) {
auto& publisher_room = rooms[0];
const auto publisher_identity = publisher_room->localParticipant()->identity();

auto track = requirePublishedTrack(publisher_room->localParticipant(), track_name);
std::cerr << "Track published\n";
auto local_track = requirePublishedTrack(publisher_room->localParticipant(), track_name);
ASSERT_TRUE(local_track->isPublished());
EXPECT_FALSE(local_track->info().uses_e2ee);
EXPECT_EQ(local_track->info().name, track_name);

auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout);
std::cerr << "Got remote track: " << remote_track->info().sid << "\n";

ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track";
EXPECT_TRUE(remote_track->isPublished());
EXPECT_FALSE(remote_track->info().uses_e2ee);
EXPECT_EQ(remote_track->info().name, track_name);
EXPECT_EQ(remote_track->publisherIdentity(), publisher_identity);

const auto frame_count =
static_cast<size_t>(std::llround(std::chrono::duration<double>(PUBLISH_DURATION).count() * publish_fps));

auto publish = [&]() {
if (!track->isPublished()) {
throw std::runtime_error("Publisher failed to publish data track");
}
if (track->info().uses_e2ee) {
throw std::runtime_error("Unexpected E2EE on test data track");
}
if (track->info().name != track_name) {
throw std::runtime_error("Published track name mismatch");
}

const auto frame_interval = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(1.0 / publish_fps));
auto next_send = std::chrono::steady_clock::now();

std::cout << "Publishing " << frame_count << " frames with payload length " << payload_len << '\n';
for (size_t index = 0; index < frame_count; ++index) {
std::vector<std::uint8_t> payload(payload_len, static_cast<std::uint8_t>(index));
requirePushSuccess(track->tryPush(std::move(payload)), "Failed to push data frame");

next_send += frame_interval;
std::this_thread::sleep_until(next_send);
}

track->unpublishDataTrack();
};

auto subscribe_result = remote_track->subscribe();
if (!subscribe_result) {
FAIL() << describeDataTrackError(subscribe_result.error());
}
auto subscription = subscribe_result.value();

std::promise<size_t> receive_count_promise;
auto receive_count_future = receive_count_promise.get_future();

auto subscribe = [&]() {
size_t received_count = 0;
std::atomic<bool> keep_publishing{true};
auto publisher = std::async(std::launch::async, [&]() {
DataTrackFrame frame;
while (subscription->read(frame) && received_count < frame_count) {
if (frame.payload.empty()) {
throw std::runtime_error("Received empty data frame");
}

const auto first_byte = frame.payload.front();
if (!std::all_of(frame.payload.begin(), frame.payload.end(),
[first_byte](std::uint8_t byte) { return byte == first_byte; })) {
throw std::runtime_error("Received frame with inconsistent payload");
}
if (frame.user_timestamp.has_value()) {
throw std::runtime_error("Received unexpected user timestamp in transport test");
}

++received_count;
frame.payload.assign(payload_len, kTransportPayloadValue);
while (keep_publishing.load()) {
requirePushSuccess(local_track->tryPush(frame), "Failed to push data frame");
std::this_thread::sleep_for(50ms);
}
});

receive_count_promise.set_value(received_count);
};

// Launch both publisher and subscriber
auto pub_fut = std::async(std::launch::async, publish);
auto sub_fut = std::async(std::launch::async, subscribe);

// Wait for both, with a combined deadline (the timeout(...) wrapper).
const auto deadline = std::chrono::steady_clock::now() + PUBLISH_DURATION + 25s;

const bool pub_ok = pub_fut.wait_until(deadline) == std::future_status::ready;
const bool sub_ok = sub_fut.wait_until(deadline) == std::future_status::ready;

if (!pub_ok || !sub_ok) {
ADD_FAILURE() << "Timed out waiting for data frames";
DataTrackFrame frame;
std::exception_ptr read_error;
try {
frame = readFrameWithTimeout(subscription, kTransportFrameTimeout);
} catch (...) {
read_error = std::current_exception();
}

// Equivalent of `try_join!`'s ? — re-throws any exception from either task
pub_fut.get();
sub_fut.get();
const bool remote_track_published_after_read = remote_track->isPublished();
keep_publishing.store(false);
subscription->close();
local_track->unpublishDataTrack();

const auto received_count = receive_count_future.get();
const auto received_percent = static_cast<float>(received_count) / static_cast<float>(frame_count);
std::cout << "Received " << received_count << "/" << frame_count << " frames (" << received_percent * 100.0f << "%)"
<< '\n';
publisher.get();
if (read_error) {
std::rethrow_exception(read_error);
}

EXPECT_GE(received_percent, MIN_PERCENTAGE) << "Received " << received_count << "/" << frame_count << " frames";
ASSERT_EQ(frame.payload.size(), payload_len);
EXPECT_TRUE(std::all_of(frame.payload.begin(), frame.payload.end(),
[](std::uint8_t byte) { return byte == kTransportPayloadValue; }));
EXPECT_FALSE(frame.user_timestamp.has_value());
EXPECT_TRUE(remote_track_published_after_read);
}

TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) {
Expand Down Expand Up @@ -853,20 +797,23 @@ TEST_F(DataTrackE2ETest, PreservesUserTimestampOnEncryptedDataTrack) {
local_track->unpublishDataTrack();
}

std::string dataTrackParamName(const ::testing::TestParamInfo<std::tuple<double, size_t>>& info) {
if (std::get<0>(info.param) > 100.0) {
return "HighFpsSinglePacket";
std::string dataTrackPayloadParamName(const ::testing::TestParamInfo<std::size_t>& info) {
if (info.param == kSinglePacketPayloadBytes) {
return "SinglePacket";
}
if (info.param == kLargeFramePayloadBytes) {
return "MultiPacket";
}
return "LowFpsMultiPacket";
return "Payload" + std::to_string(info.param);
}

std::string keyDerivationParamName(const ::testing::TestParamInfo<KeyDerivationFunction>& info) {
return keyDerivationFunctionName(info.param);
}

INSTANTIATE_TEST_SUITE_P(DataTrackScenarios, DataTrackTransportTest,
::testing::Values(std::make_tuple(120.0, size_t{8192}), std::make_tuple(10.0, size_t{196608})),
dataTrackParamName);
INSTANTIATE_TEST_SUITE_P(DataTrackPayloads, DataTrackTransportTest,
::testing::Values(kSinglePacketPayloadBytes, kLargeFramePayloadBytes),
dataTrackPayloadParamName);

INSTANTIATE_TEST_SUITE_P(KeyDerivationFunctions, DataTrackKeyDerivationTest,
::testing::Values(KeyDerivationFunction::PBKDF2, KeyDerivationFunction::HKDF),
Expand Down
Loading