diff --git a/.gitignore b/.gitignore index 2a54c6ab..e6706760 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ lib/ web/ *trace.json compile_commands.json +user_stories diff --git a/client-sdk-rust b/client-sdk-rust index f3064cd4..c013bb4f 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit f3064cd4b091b7f8cb5fa38b469bf3389edcadb9 +Subproject commit c013bb4fcef80e6c1a9b3b0f4ddab809d095c41e diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index f18c87aa..54dbfb45 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -35,7 +35,6 @@ namespace livekit { namespace { - inline void logAndThrow(const std::string& error_msg) { LK_LOG_ERROR("LiveKit SDK Error: {}", error_msg); throw std::runtime_error(error_msg); diff --git a/src/room.cpp b/src/room.cpp index d9e96a44..207fde7a 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -104,7 +104,6 @@ void Room::setDelegate(RoomDelegate* delegate) { bool Room::Connect(const std::string& url, const std::string& token, const RoomOptions& options) { TRACE_EVENT0("livekit", "Room::Connect"); - { const std::scoped_lock g(lock_); if (connection_state_ != ConnectionState::Disconnected) { @@ -112,8 +111,17 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO } connection_state_ = ConnectionState::Reconnecting; } - auto fut = FfiClient::instance().connectAsync(url, token, options); + + FfiClient::ListenerId listenerId = 0; try { + // Install listener (Room is fully initialized) [2] + listenerId = FfiClient::instance().AddListener([this](const proto::FfiEvent& e) { OnEvent(e); }); + { + const std::scoped_lock g(lock_); + listener_id_ = listenerId; + } + + auto fut = FfiClient::instance().connectAsync(url, token, options); // [1] auto connectCb = fut.get(); // fut will throw if it fails to connect to the room const auto& owned_room = connectCb.result().room(); @@ -141,6 +149,7 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO std::make_unique(std::move(participant_handle), pinfo.sid(), pinfo.name(), pinfo.identity(), pinfo.metadata(), std::move(attrs), kind, reason); } + // Setup remote participants std::unordered_map> new_remote_participants; { @@ -178,17 +187,20 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO connection_state_ = ConnectionState::Connected; } - // Install listener (Room is fully initialized) - auto listenerId = FfiClient::instance().AddListener([this](const proto::FfiEvent& e) { OnEvent(e); }); + return true; + } catch (const std::exception& e) { + int listener_to_remove = 0; { const std::scoped_lock g(lock_); - listener_id_ = listenerId; + connection_state_ = ConnectionState::Disconnected; + if (listener_id_ == listenerId) { + listener_to_remove = listener_id_; + listener_id_ = 0; + } + } + if (listener_to_remove != 0) { + FfiClient::instance().RemoveListener(listener_to_remove); } - - return true; - } catch (const std::exception& e) { - // On error, set the connection_state_ to Disconnected - connection_state_ = ConnectionState::Disconnected; LK_LOG_ERROR("Room::Connect failed: {}", e.what()); return false; } diff --git a/src/tests/integration/test_late_join_track_publication.cpp b/src/tests/integration/test_late_join_track_publication.cpp new file mode 100644 index 00000000..8c746265 --- /dev/null +++ b/src/tests/integration/test_late_join_track_publication.cpp @@ -0,0 +1,641 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../common/audio_utils.h" +#include "../common/test_common.h" +#include "../common/video_utils.h" + +namespace livekit::test { + +using namespace std::chrono_literals; + +namespace { + +constexpr auto kWaitTimeout = 20s; +constexpr int kAudioTrackCount = 2; +constexpr int kVideoTrackCount = 2; +constexpr int kDataTrackCount = 4; +constexpr int kVideoWidth = 160; +constexpr int kVideoHeight = 90; + +struct ExpectedPublication { + std::string name; + TrackKind kind = TrackKind::KIND_UNKNOWN; +}; + +struct LateJoinPublicationState { + std::mutex mutex; + std::condition_variable cv; + std::string expected_publisher_identity; + std::map expected_media_tracks; + std::set expected_data_tracks; + std::map published_media_tracks; + std::map subscribed_media_tracks; + std::map published_data_tracks; + std::map published_media_counts; + std::map subscribed_media_counts; + std::map published_data_counts; + std::vector invariant_failures; +}; + +class LateJoinPublicationDelegate : public RoomDelegate { +public: + explicit LateJoinPublicationDelegate(LateJoinPublicationState& state) : state_(state) {} + + void onTrackPublished(Room& room, const TrackPublishedEvent& event) override { + if (!event.publication) { + return; + } + + validateMediaCallbackState(room, event.publication->name(), event.publication->kind(), "onTrackPublished"); + + std::lock_guard lock(state_.mutex); + state_.published_media_tracks[event.publication->name()] = event.publication->kind(); + ++state_.published_media_counts[event.publication->name()]; + state_.cv.notify_all(); + } + + void onTrackSubscribed(Room& room, const TrackSubscribedEvent& event) override { + if (!event.publication) { + return; + } + + validateMediaCallbackState(room, event.publication->name(), event.publication->kind(), "onTrackSubscribed"); + + std::lock_guard lock(state_.mutex); + state_.subscribed_media_tracks[event.publication->name()] = event.publication->kind(); + ++state_.subscribed_media_counts[event.publication->name()]; + state_.cv.notify_all(); + } + + void onDataTrackPublished(Room& room, const DataTrackPublishedEvent& event) override { + if (!event.track) { + return; + } + + validateDataCallbackState(room, event.track->info().name, event.track->publisherIdentity(), "onDataTrackPublished"); + + std::lock_guard lock(state_.mutex); + state_.published_data_tracks[event.track->info().name] = event.track->publisherIdentity(); + ++state_.published_data_counts[event.track->info().name]; + state_.cv.notify_all(); + } + +private: + static bool hasPublication(RemoteParticipant* participant, const std::string& name, TrackKind kind) { + if (participant == nullptr) { + return false; + } + + for (const auto& [sid, publication] : participant->trackPublications()) { + (void)sid; + if (publication && publication->name() == name && publication->kind() == kind) { + return true; + } + } + return false; + } + + void recordInvariantFailure(const std::string& message) { + std::lock_guard lock(state_.mutex); + state_.invariant_failures.push_back(message); + state_.cv.notify_all(); + } + + void validateCommonCallbackState(Room& room, const std::string& callback_name) { + if (room.localParticipant() == nullptr) { + recordInvariantFailure(callback_name + " fired before room.localParticipant() was initialized"); + } + + std::string expected_publisher_identity; + { + std::lock_guard lock(state_.mutex); + expected_publisher_identity = state_.expected_publisher_identity; + } + + if (!expected_publisher_identity.empty() && room.remoteParticipant(expected_publisher_identity) == nullptr) { + recordInvariantFailure(callback_name + + " fired before expected remote participant was visible: " + expected_publisher_identity); + } + } + + void validateMediaCallbackState(Room& room, const std::string& track_name, TrackKind kind, + const std::string& callback_name) { + validateCommonCallbackState(room, callback_name); + + std::string expected_publisher_identity; + bool expected_track = false; + { + std::lock_guard lock(state_.mutex); + expected_publisher_identity = state_.expected_publisher_identity; + const auto it = state_.expected_media_tracks.find(track_name); + expected_track = it != state_.expected_media_tracks.end() && it->second == kind; + } + + if (!expected_track) { + recordInvariantFailure(callback_name + " fired for unexpected media publication: " + track_name); + } + + auto* participant = room.remoteParticipant(expected_publisher_identity); + if (!hasPublication(participant, track_name, kind)) { + recordInvariantFailure(callback_name + " fired before expected remote publication was visible: " + track_name); + } + } + + void validateDataCallbackState(Room& room, const std::string& track_name, const std::string& publisher_identity, + const std::string& callback_name) { + validateCommonCallbackState(room, callback_name); + + std::string expected_publisher_identity; + bool expected_track = false; + { + std::lock_guard lock(state_.mutex); + expected_publisher_identity = state_.expected_publisher_identity; + expected_track = state_.expected_data_tracks.count(track_name) != 0; + } + + if (!expected_track) { + recordInvariantFailure(callback_name + " fired for unexpected data publication: " + track_name); + } + if (publisher_identity != expected_publisher_identity) { + recordInvariantFailure(callback_name + " publisher identity mismatch for " + track_name); + } + } + + LateJoinPublicationState& state_; +}; + +class MediaLoopGuard { +public: + MediaLoopGuard() = default; + MediaLoopGuard(const MediaLoopGuard&) = delete; + MediaLoopGuard& operator=(const MediaLoopGuard&) = delete; + + ~MediaLoopGuard() { stop(); } + + void addVideoSource(const std::shared_ptr& source, bool red_mode) { + threads_.emplace_back([this, source, red_mode]() { + runVideoLoop(source, running_, red_mode ? fillRedWrapper : fillWebcamWrapper, kVideoWidth, kVideoHeight); + }); + } + + void addAudioSource(const std::shared_ptr& source, double base_freq_hz, bool siren_mode) { + threads_.emplace_back( + [this, source, base_freq_hz, siren_mode]() { runToneLoop(source, running_, base_freq_hz, siren_mode); }); + } + + void stop() { + running_.store(false, std::memory_order_relaxed); + for (auto& thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } + } + +private: + std::atomic running_{true}; + std::vector threads_; +}; + +class PublishedTrackGuard { +public: + explicit PublishedTrackGuard(LocalParticipant* participant) : participant_(participant) {} + + PublishedTrackGuard(const PublishedTrackGuard&) = delete; + PublishedTrackGuard& operator=(const PublishedTrackGuard&) = delete; + + ~PublishedTrackGuard() { unpublishAll(); } + + void addMediaTrack(const std::shared_ptr& track, const std::string& sid) { + media_tracks_.push_back({track, sid}); + } + + void addDataTrack(const std::shared_ptr& track) { data_tracks_.push_back(track); } + + void unpublishAll() { + if (participant_ != nullptr) { + for (const auto& track : media_tracks_) { + if (track.track && !track.sid.empty()) { + participant_->unpublishTrack(track.sid); + } + } + } + + for (const auto& track : data_tracks_) { + if (track && track->isPublished()) { + track->unpublishDataTrack(); + } + } + + media_tracks_.clear(); + data_tracks_.clear(); + } + +private: + struct PublishedMediaTrack { + std::shared_ptr track; + std::string sid; + }; + + LocalParticipant* participant_ = nullptr; + std::vector media_tracks_; + std::vector> data_tracks_; +}; + +bool hasExpectedMediaSubscriptions(const LateJoinPublicationState& state, + const std::vector& expected_media) { + for (const auto& expected : expected_media) { + const auto subscribed_it = state.subscribed_media_tracks.find(expected.name); + if (subscribed_it == state.subscribed_media_tracks.end() || subscribed_it->second != expected.kind) { + return false; + } + } + return true; +} + +bool hasExpectedDataPublications(const LateJoinPublicationState& state, const std::set& expected_data) { + for (const auto& name : expected_data) { + if (state.published_data_tracks.count(name) == 0 || state.published_data_counts.count(name) == 0) { + return false; + } + } + return true; +} + +const char* trackKindName(TrackKind kind) { + switch (kind) { + case TrackKind::KIND_AUDIO: + return "audio"; + case TrackKind::KIND_VIDEO: + return "video"; + case TrackKind::KIND_UNKNOWN: + break; + } + return "unknown"; +} + +std::string describeMediaTracks(const std::map& tracks) { + std::ostringstream out; + bool first = true; + for (const auto& [name, kind] : tracks) { + if (!first) { + out << ", "; + } + first = false; + out << name << "=" << trackKindName(kind); + } + return tracks.empty() ? "" : out.str(); +} + +std::string describeDataTracks(const std::map& tracks) { + std::ostringstream out; + bool first = true; + for (const auto& [name, publisher_identity] : tracks) { + if (!first) { + out << ", "; + } + first = false; + out << name << "=" << publisher_identity; + } + return tracks.empty() ? "" : out.str(); +} + +std::string describeInvariantFailures(const std::vector& failures) { + std::ostringstream out; + for (std::size_t i = 0; i < failures.size(); ++i) { + if (i != 0) { + out << "; "; + } + out << failures[i]; + } + return failures.empty() ? "" : out.str(); +} + +std::string makeTrackName(const std::string& prefix, int index) { + return prefix + "-" + std::to_string(index) + "-" + std::to_string(getTimestampUs()); +} + +} // namespace + +class LateJoinTrackPublicationIntegrationTest : public LiveKitTestBase, public ::testing::WithParamInterface {}; + +TEST_P(LateJoinTrackPublicationIntegrationTest, ConsumerReceivesAlreadyPublishedAudioTrackEvents) { + const bool single_peer_connection = GetParam(); + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + Room publisher_room; + ASSERT_TRUE(publisher_room.Connect(config_.url, config_.token_a, options)) << "Publisher failed to connect"; + ASSERT_NE(publisher_room.localParticipant(), nullptr); + + const std::string publisher_identity = publisher_room.localParticipant()->identity(); + ASSERT_FALSE(publisher_identity.empty()); + + PublishedTrackGuard published_tracks(publisher_room.localParticipant()); + MediaLoopGuard media_loops; + std::vector expected_media; + + for (int i = 0; i < kAudioTrackCount; ++i) { + const std::string track_name = makeTrackName("late-join-audio", i); + auto source = std::make_shared(kDefaultAudioSampleRate, kDefaultAudioChannels, 0); + auto track = LocalAudioTrack::createLocalAudioTrack(track_name, source); + TrackPublishOptions publish_options; + publish_options.source = TrackSource::SOURCE_MICROPHONE; + + ASSERT_NO_THROW(publisher_room.localParticipant()->publishTrack(track, publish_options)); + ASSERT_NE(track->publication(), nullptr) << "Audio track was not locally published"; + + published_tracks.addMediaTrack(track, track->publication()->sid()); + media_loops.addAudioSource(source, 320.0 + static_cast(i) * 60.0, i % 2 == 1); + expected_media.push_back({track_name, TrackKind::KIND_AUDIO}); + } + + LateJoinPublicationState state; + state.expected_publisher_identity = publisher_identity; + for (const auto& expected : expected_media) { + state.expected_media_tracks[expected.name] = expected.kind; + } + LateJoinPublicationDelegate delegate(state); + Room consumer_room; + consumer_room.setDelegate(&delegate); + + ASSERT_TRUE(consumer_room.Connect(config_.url, config_.token_b, options)) << "Consumer failed to connect"; + ASSERT_NE(consumer_room.localParticipant(), nullptr); + ASSERT_TRUE(waitForParticipant(&consumer_room, publisher_identity, 10s)) + << "Publisher not visible to late-joining consumer"; + + { + std::unique_lock lock(state.mutex); + // Pre-existing media publications are delivered in the Connect snapshot, not as + // TrackPublished room events. The late-joiner should still receive TrackSubscribed + // callbacks once auto-subscribe attaches to those snapshot publications. + const bool got_expected = + state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedMediaSubscriptions(state, expected_media); }); + EXPECT_TRUE(got_expected) << "Timed out waiting for late-join audio subscription events\n" + << "Published media events: " << describeMediaTracks(state.published_media_tracks) << "\n" + << "Subscribed media events: " << describeMediaTracks(state.subscribed_media_tracks); + } + + std::map subscribed_media_snapshot; + std::map subscribed_media_counts; + std::vector invariant_failures; + { + std::lock_guard lock(state.mutex); + subscribed_media_snapshot = state.subscribed_media_tracks; + subscribed_media_counts = state.subscribed_media_counts; + invariant_failures = state.invariant_failures; + } + EXPECT_TRUE(invariant_failures.empty()) << describeInvariantFailures(invariant_failures); + + for (const auto& expected : expected_media) { + const auto subscribed_it = subscribed_media_snapshot.find(expected.name); + EXPECT_NE(subscribed_it, subscribed_media_snapshot.end()) + << "Missing onTrackSubscribed event for " << expected.name + << "; received: " << describeMediaTracks(subscribed_media_snapshot); + if (subscribed_it != subscribed_media_snapshot.end()) { + EXPECT_EQ(subscribed_it->second, expected.kind) << "Subscribed track kind mismatch for " << expected.name; + } + EXPECT_EQ(subscribed_media_counts[expected.name], 1) << "Unexpected onTrackSubscribed count for " << expected.name; + } + + auto* publisher_on_consumer = consumer_room.remoteParticipant(publisher_identity); + ASSERT_NE(publisher_on_consumer, nullptr); + + std::map remote_publications; + for (const auto& [sid, publication] : publisher_on_consumer->trackPublications()) { + (void)sid; + if (publication) { + remote_publications[publication->name()] = publication->kind(); + } + } + + for (const auto& expected : expected_media) { + const auto it = remote_publications.find(expected.name); + EXPECT_NE(it, remote_publications.end()) << "Late consumer snapshot missing publication " << expected.name + << "; snapshot: " << describeMediaTracks(remote_publications); + if (it != remote_publications.end()) { + EXPECT_EQ(it->second, expected.kind) << "Snapshot track kind mismatch for " << expected.name; + } + } + + media_loops.stop(); + published_tracks.unpublishAll(); +} + +TEST_P(LateJoinTrackPublicationIntegrationTest, ConsumerReceivesAlreadyPublishedVideoTrackEvents) { + const bool single_peer_connection = GetParam(); + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + Room publisher_room; + ASSERT_TRUE(publisher_room.Connect(config_.url, config_.token_a, options)) << "Publisher failed to connect"; + ASSERT_NE(publisher_room.localParticipant(), nullptr); + + const std::string publisher_identity = publisher_room.localParticipant()->identity(); + ASSERT_FALSE(publisher_identity.empty()); + + PublishedTrackGuard published_tracks(publisher_room.localParticipant()); + MediaLoopGuard media_loops; + std::vector expected_media; + + for (int i = 0; i < kVideoTrackCount; ++i) { + const std::string track_name = makeTrackName("late-join-video", i); + auto source = std::make_shared(kVideoWidth, kVideoHeight); + auto track = LocalVideoTrack::createLocalVideoTrack(track_name, source); + TrackPublishOptions publish_options; + publish_options.source = TrackSource::SOURCE_CAMERA; + publish_options.simulcast = false; + + ASSERT_NO_THROW(publisher_room.localParticipant()->publishTrack(track, publish_options)); + ASSERT_NE(track->publication(), nullptr) << "Video track was not locally published"; + + published_tracks.addMediaTrack(track, track->publication()->sid()); + media_loops.addVideoSource(source, i % 2 == 1); + expected_media.push_back({track_name, TrackKind::KIND_VIDEO}); + } + + LateJoinPublicationState state; + state.expected_publisher_identity = publisher_identity; + for (const auto& expected : expected_media) { + state.expected_media_tracks[expected.name] = expected.kind; + } + LateJoinPublicationDelegate delegate(state); + Room consumer_room; + consumer_room.setDelegate(&delegate); + + ASSERT_TRUE(consumer_room.Connect(config_.url, config_.token_b, options)) << "Consumer failed to connect"; + ASSERT_NE(consumer_room.localParticipant(), nullptr); + ASSERT_TRUE(waitForParticipant(&consumer_room, publisher_identity, 10s)) + << "Publisher not visible to late-joining consumer"; + + { + std::unique_lock lock(state.mutex); + // Pre-existing media publications are delivered in the Connect snapshot, not as + // TrackPublished room events. The late-joiner should still receive TrackSubscribed + // callbacks once auto-subscribe attaches to those snapshot publications. + const bool got_expected = + state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedMediaSubscriptions(state, expected_media); }); + EXPECT_TRUE(got_expected) << "Timed out waiting for late-join video subscription events\n" + << "Published media events: " << describeMediaTracks(state.published_media_tracks) << "\n" + << "Subscribed media events: " << describeMediaTracks(state.subscribed_media_tracks); + } + + std::map subscribed_media_snapshot; + std::map subscribed_media_counts; + std::vector invariant_failures; + { + std::lock_guard lock(state.mutex); + subscribed_media_snapshot = state.subscribed_media_tracks; + subscribed_media_counts = state.subscribed_media_counts; + invariant_failures = state.invariant_failures; + } + EXPECT_TRUE(invariant_failures.empty()) << describeInvariantFailures(invariant_failures); + + for (const auto& expected : expected_media) { + const auto subscribed_it = subscribed_media_snapshot.find(expected.name); + EXPECT_NE(subscribed_it, subscribed_media_snapshot.end()) + << "Missing onTrackSubscribed event for " << expected.name + << "; received: " << describeMediaTracks(subscribed_media_snapshot); + if (subscribed_it != subscribed_media_snapshot.end()) { + EXPECT_EQ(subscribed_it->second, expected.kind) << "Subscribed track kind mismatch for " << expected.name; + } + EXPECT_EQ(subscribed_media_counts[expected.name], 1) << "Unexpected onTrackSubscribed count for " << expected.name; + } + + auto* publisher_on_consumer = consumer_room.remoteParticipant(publisher_identity); + ASSERT_NE(publisher_on_consumer, nullptr); + + std::map remote_publications; + for (const auto& [sid, publication] : publisher_on_consumer->trackPublications()) { + (void)sid; + if (publication) { + remote_publications[publication->name()] = publication->kind(); + } + } + + for (const auto& expected : expected_media) { + const auto it = remote_publications.find(expected.name); + EXPECT_NE(it, remote_publications.end()) << "Late consumer snapshot missing publication " << expected.name + << "; snapshot: " << describeMediaTracks(remote_publications); + if (it != remote_publications.end()) { + EXPECT_EQ(it->second, expected.kind) << "Snapshot track kind mismatch for " << expected.name; + } + } + + media_loops.stop(); + published_tracks.unpublishAll(); +} + +TEST_P(LateJoinTrackPublicationIntegrationTest, ConsumerReceivesAlreadyPublishedDataTrackEvents) { + const bool single_peer_connection = GetParam(); + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + Room publisher_room; + ASSERT_TRUE(publisher_room.Connect(config_.url, config_.token_a, options)) << "Publisher failed to connect"; + ASSERT_NE(publisher_room.localParticipant(), nullptr); + + const std::string publisher_identity = publisher_room.localParticipant()->identity(); + ASSERT_FALSE(publisher_identity.empty()); + + PublishedTrackGuard published_tracks(publisher_room.localParticipant()); + std::set expected_data; + + for (int i = 0; i < kDataTrackCount; ++i) { + const std::string track_name = makeTrackName("late-join-data", i); + auto publish_result = publisher_room.localParticipant()->publishDataTrack(track_name); + ASSERT_TRUE(publish_result) << "Failed to publish data track " << track_name << ": " + << publish_result.error().message; + + const auto& track = publish_result.value(); + ASSERT_TRUE(track->isPublished()) << "Data track was not locally published: " << track_name; + + published_tracks.addDataTrack(track); + expected_data.insert(track_name); + } + + LateJoinPublicationState state; + state.expected_publisher_identity = publisher_identity; + state.expected_data_tracks = expected_data; + LateJoinPublicationDelegate delegate(state); + Room consumer_room; + consumer_room.setDelegate(&delegate); + + ASSERT_TRUE(consumer_room.Connect(config_.url, config_.token_b, options)) << "Consumer failed to connect"; + ASSERT_NE(consumer_room.localParticipant(), nullptr); + ASSERT_TRUE(waitForParticipant(&consumer_room, publisher_identity, 10s)) + << "Publisher not visible to late-joining consumer"; + + { + std::unique_lock lock(state.mutex); + const bool got_expected = + state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedDataPublications(state, expected_data); }); + EXPECT_TRUE(got_expected) << "Timed out waiting for late-join data publication events\n" + << "Published data events: " << describeDataTracks(state.published_data_tracks); + } + + std::map data_snapshot; + std::map data_counts; + std::vector invariant_failures; + { + std::lock_guard lock(state.mutex); + data_snapshot = state.published_data_tracks; + data_counts = state.published_data_counts; + invariant_failures = state.invariant_failures; + } + EXPECT_TRUE(invariant_failures.empty()) << describeInvariantFailures(invariant_failures); + + EXPECT_EQ(data_snapshot.size(), expected_data.size()) + << "Late-joining consumer received an unexpected number of data track publication callbacks: " + << describeDataTracks(data_snapshot); + + for (const auto& name : expected_data) { + const auto it = data_snapshot.find(name); + EXPECT_NE(it, data_snapshot.end()) << "Missing onDataTrackPublished event for " << name; + if (it != data_snapshot.end()) { + EXPECT_EQ(it->second, publisher_identity) << "Publisher identity mismatch for data track " << name; + } + EXPECT_EQ(data_counts[name], 1) << "Unexpected onDataTrackPublished count for " << name; + } + + published_tracks.unpublishAll(); +} + +INSTANTIATE_TEST_SUITE_P(PeerConnectionModes, LateJoinTrackPublicationIntegrationTest, ::testing::Values(false, true), + [](const ::testing::TestParamInfo& info) { + return info.param ? "SinglePeerConnection" : "DualPeerConnection"; + }); + +} // namespace livekit::test diff --git a/src/tests/integration/test_room_listener_cleanup.cpp b/src/tests/integration/test_room_listener_cleanup.cpp new file mode 100644 index 00000000..ccc975fc --- /dev/null +++ b/src/tests/integration/test_room_listener_cleanup.cpp @@ -0,0 +1,146 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "../common/test_common.h" + +namespace livekit::test { + +using namespace std::chrono_literals; + +namespace { + +constexpr auto kListenerCleanupTimeout = 10s; +constexpr auto kDuplicateListenerGracePeriod = 500ms; + +class ParticipantConnectedCounter : public RoomDelegate { +public: + void onParticipantConnected(Room&, const ParticipantConnectedEvent& event) override { + if (event.participant == nullptr) { + return; + } + + std::lock_guard lock(mutex_); + ++counts_[event.participant->identity()]; + cv_.notify_all(); + } + + bool waitForCount(const std::string& identity, int count, std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [&]() { return countForLocked(identity) >= count; }); + } + + int countFor(const std::string& identity) const { + std::lock_guard lock(mutex_); + return countForLocked(identity); + } + +private: + int countForLocked(const std::string& identity) const { + const auto it = counts_.find(identity); + return it == counts_.end() ? 0 : it->second; + } + + mutable std::mutex mutex_; + std::condition_variable cv_; + std::map counts_; +}; + +void expectSingleParticipantConnectedCallback(ParticipantConnectedCounter& counter, const std::string& identity) { + ASSERT_TRUE(counter.waitForCount(identity, 1, kListenerCleanupTimeout)) + << "Expected one onParticipantConnected callback for " << identity; + + std::this_thread::sleep_for(kDuplicateListenerGracePeriod); + EXPECT_EQ(counter.countFor(identity), 1) << "Duplicate listener delivered multiple participant callbacks"; +} + +void expectFailedConnectDoesNotDuplicateParticipantCallbacks(const TestConfig& config, const std::string& failed_url, + const std::string& failed_token) { + RoomOptions options; + options.auto_subscribe = true; + + ParticipantConnectedCounter counter; + Room observed_room; + observed_room.setDelegate(&counter); + + EXPECT_FALSE(observed_room.Connect(failed_url, failed_token, options)) << "Initial failing Connect() should fail"; + + ASSERT_TRUE(observed_room.Connect(config.url, config.token_a, options)) << "Reconnect after failed Connect() failed"; + ASSERT_NE(observed_room.localParticipant(), nullptr); + + Room peer_room; + ASSERT_TRUE(peer_room.Connect(config.url, config.token_b, options)) << "Peer failed to connect"; + ASSERT_NE(peer_room.localParticipant(), nullptr); + const std::string peer_identity = peer_room.localParticipant()->identity(); + ASSERT_FALSE(peer_identity.empty()); + + expectSingleParticipantConnectedCallback(counter, peer_identity); +} + +} // namespace + +class RoomListenerCleanupIntegrationTest : public LiveKitTestBase {}; + +TEST_F(RoomListenerCleanupIntegrationTest, FailedInvalidTokenConnectDoesNotLeaveDuplicateListener) { + if (!config_.available) { + throw std::runtime_error("LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"); + } + + expectFailedConnectDoesNotDuplicateParticipantCallbacks(config_, config_.url, "invalid_token"); +} + +TEST_F(RoomListenerCleanupIntegrationTest, FailedInvalidUrlConnectDoesNotLeaveDuplicateListener) { + if (!config_.available) { + throw std::runtime_error("LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"); + } + + expectFailedConnectDoesNotDuplicateParticipantCallbacks(config_, "ws://127.0.0.1:9", config_.token_a); +} + +TEST_F(RoomListenerCleanupIntegrationTest, AlreadyConnectedConnectDoesNotReplaceOrLeakListener) { + if (!config_.available) { + throw std::runtime_error("LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"); + } + + RoomOptions options; + options.auto_subscribe = true; + + ParticipantConnectedCounter counter; + Room observed_room; + observed_room.setDelegate(&counter); + + ASSERT_TRUE(observed_room.Connect(config_.url, config_.token_a, options)) << "Initial Connect() failed"; + ASSERT_NE(observed_room.localParticipant(), nullptr); + + EXPECT_THROW((void)observed_room.Connect(config_.url, config_.token_a, options), std::runtime_error); + + Room peer_room; + ASSERT_TRUE(peer_room.Connect(config_.url, config_.token_b, options)) << "Peer failed to connect"; + ASSERT_NE(peer_room.localParticipant(), nullptr); + const std::string peer_identity = peer_room.localParticipant()->identity(); + ASSERT_FALSE(peer_identity.empty()); + + expectSingleParticipantConnectedCallback(counter, peer_identity); +} + +} // namespace livekit::test diff --git a/src/tests/stress/test_room_listener_race_stress.cpp b/src/tests/stress/test_room_listener_race_stress.cpp new file mode 100644 index 00000000..85ae0f3d --- /dev/null +++ b/src/tests/stress/test_room_listener_race_stress.cpp @@ -0,0 +1,164 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../common/test_common.h" + +namespace livekit::test { + +using namespace std::chrono_literals; + +namespace { + +constexpr int kListenerRaceIterations = 5; +constexpr int kTracksPerPublisher = 3; +constexpr auto kConnectTimeout = 30s; + +void addStressError(std::vector& errors, std::mutex& errors_mutex, const std::string& message) { + std::lock_guard lock(errors_mutex); + errors.push_back(message); +} + +std::string describeStressErrors(const std::vector& errors) { + std::ostringstream out; + for (std::size_t i = 0; i < errors.size(); ++i) { + if (i != 0) { + out << "; "; + } + out << errors[i]; + } + return errors.empty() ? "" : out.str(); +} + +} // namespace + +class RoomListenerRaceStressTest : public LiveKitTestBase {}; + +TEST_F(RoomListenerRaceStressTest, ConnectFailDestroyReconnectAndPublishDataTracksConcurrently) { + if (!config_.available) { + GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_TOKEN_A, and LIVEKIT_TOKEN_B not set"; + } + + for (int iteration = 0; iteration < kListenerRaceIterations; ++iteration) { + std::atomic connected_publishers{0}; + std::atomic publish_start{false}; + std::mutex errors_mutex; + std::vector errors; + std::vector publishers; + publishers.reserve(2); + + const std::array tokens = {config_.token_a, config_.token_b}; + + for (std::size_t publisher_index = 0; publisher_index < tokens.size(); ++publisher_index) { + publishers.emplace_back([&, publisher_index]() { + try { + RoomOptions options; + options.auto_subscribe = true; + + { + Room failed_room; + if (failed_room.Connect("ws://127.0.0.1:9", tokens[publisher_index], options)) { + addStressError(errors, errors_mutex, "unexpected successful failed connect"); + return; + } + } + + Room room; + if (!room.Connect(config_.url, tokens[publisher_index], options)) { + addStressError(errors, errors_mutex, "valid connect failed"); + return; + } + + auto* local_participant = room.localParticipant(); + if (local_participant == nullptr) { + addStressError(errors, errors_mutex, "local participant missing after valid connect"); + return; + } + + connected_publishers.fetch_add(1, std::memory_order_release); + while (!publish_start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + for (int track_index = 0; track_index < kTracksPerPublisher; ++track_index) { + const std::string track_name = "listener-race-" + std::to_string(iteration) + "-" + + std::to_string(publisher_index) + "-" + std::to_string(track_index); + auto publish_result = local_participant->publishDataTrack(track_name); + if (!publish_result) { + addStressError(errors, errors_mutex, + "publishDataTrack failed for " + track_name + ": " + publish_result.error().message); + continue; + } + + const auto& track = publish_result.value(); + if (!track || !track->isPublished()) { + addStressError(errors, errors_mutex, "published data track not marked published: " + track_name); + } + } + + std::this_thread::sleep_for(50ms); + } catch (const std::exception& e) { + addStressError(errors, errors_mutex, std::string("exception in publisher thread: ") + e.what()); + } catch (...) { + addStressError(errors, errors_mutex, "unknown exception in publisher thread"); + } + }); + } + + const auto connect_deadline = std::chrono::steady_clock::now() + kConnectTimeout; + while (connected_publishers.load(std::memory_order_acquire) < static_cast(tokens.size()) && + std::chrono::steady_clock::now() < connect_deadline) { + { + std::lock_guard lock(errors_mutex); + if (!errors.empty()) { + break; + } + } + std::this_thread::sleep_for(10ms); + } + + if (connected_publishers.load(std::memory_order_acquire) != static_cast(tokens.size())) { + addStressError(errors, errors_mutex, "timed out waiting for publishers to connect"); + } + + publish_start.store(true, std::memory_order_release); + for (auto& publisher : publishers) { + if (publisher.joinable()) { + publisher.join(); + } + } + + std::vector errors_snapshot; + { + std::lock_guard lock(errors_mutex); + errors_snapshot = errors; + } + ASSERT_TRUE(errors_snapshot.empty()) << "iteration " << iteration << ": " << describeStressErrors(errors_snapshot); + } +} + +} // namespace livekit::test