Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client-sdk-rust
18 changes: 17 additions & 1 deletion include/livekit/data_track_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mutex>
#include <optional>

#include "livekit/data_track_error.h"
#include "livekit/data_track_frame.h"
#include "livekit/ffi_handle.h"

Expand Down Expand Up @@ -79,6 +80,15 @@ class DataTrackStream {
*/
bool read(DataTrackFrame& out);

/**
* Returns the terminal subscription error reported by the FFI stream.
*
* This is set when read() returns false because subscription establishment
* failed before any frames were emitted. It remains empty for normal EOS or
* when close() ends the stream locally.
*/
std::optional<SubscribeDataTrackError> terminalError() const;

/**
* End the stream early.
*
Expand All @@ -89,6 +99,9 @@ class DataTrackStream {

private:
friend class RemoteDataTrack;
#ifdef LIVEKIT_TEST_ACCESS
friend class DataTrackStreamTest;
#endif

DataTrackStream() = default;
/// Internal init helper, called by RemoteDataTrack.
Expand All @@ -101,7 +114,7 @@ class DataTrackStream {
void pushFrame(DataTrackFrame&& frame);

/// Push an end-of-stream signal (EOS).
void pushEos();
void pushEos(std::optional<SubscribeDataTrackError> error = std::nullopt);

/** Protects all mutable state below. */
mutable std::mutex mutex_;
Expand All @@ -122,6 +135,9 @@ class DataTrackStream {
/** True after close() has been called by the consumer. */
bool closed_{false};

/** Typed terminal error reported with EOS, if subscription setup failed. */
std::optional<SubscribeDataTrackError> terminal_error_;

/** RAII handle for the Rust-owned subscription resource. */
FfiHandle subscription_handle_;

Expand Down
3 changes: 3 additions & 0 deletions src/data_track_error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "livekit/data_track_error.h"

#include "data_track.pb.h"
#include "lk_log.h"

namespace livekit {

Expand Down Expand Up @@ -95,6 +96,8 @@ LocalDataTrackTryPushError LocalDataTrackTryPushError::fromProto(const proto::Lo
}

SubscribeDataTrackError SubscribeDataTrackError::fromProto(const proto::SubscribeDataTrackError& error) {
LK_LOG_WARN("Subscribe data track error from FFI: code={} message={}", static_cast<std::uint32_t>(error.code()),
error.message());
return SubscribeDataTrackError{fromProtoCode(error.code()), error.message()};
}

Expand Down
22 changes: 19 additions & 3 deletions src/data_track_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ bool DataTrackStream::read(DataTrackFrame& out) {
return true;
}

std::optional<SubscribeDataTrackError> DataTrackStream::terminalError() const {
const std::scoped_lock<std::mutex> lock(mutex_);
return terminal_error_;
Comment thread
stephen-derosa marked this conversation as resolved.
}

void DataTrackStream::close() {
std::int32_t listener_id = -1;
{
Expand All @@ -73,9 +78,14 @@ void DataTrackStream::close() {
return;
}
closed_ = true;
// Preserve errors reported by EOS for post-stream inspection, but do not
// treat a local early close as a terminal subscription error.
if (!eof_) {
terminal_error_.reset();
}
subscription_handle_.reset();
listener_id = listener_id_;
listener_id_ = 0;
listener_id_ = -1;
}

if (listener_id != -1) {
Expand Down Expand Up @@ -103,7 +113,12 @@ void DataTrackStream::onFfiEvent(const FfiEvent& event) {
DataTrackFrame frame = DataTrackFrame::fromOwnedInfo(fr);
pushFrame(std::move(frame));
} else if (dts.has_eos()) {
pushEos();
std::optional<SubscribeDataTrackError> error;
const auto& eos = dts.eos();
if (eos.has_error()) {
error = SubscribeDataTrackError::fromProto(eos.error());
}
pushEos(std::move(error));
}
}

Expand All @@ -123,13 +138,14 @@ void DataTrackStream::pushFrame(DataTrackFrame&& frame) {
cv_.notify_one();
}

void DataTrackStream::pushEos() {
void DataTrackStream::pushEos(std::optional<SubscribeDataTrackError> error) {
{
const std::scoped_lock<std::mutex> lock(mutex_);
if (eof_) {
Comment thread
stephen-derosa marked this conversation as resolved.
return;
}
eof_ = true;
terminal_error_ = std::move(error);
}
cv_.notify_all();
}
Expand Down
17 changes: 11 additions & 6 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ inline void logAndThrow(const std::string& error_msg) {
throw std::runtime_error(error_msg);
}

Result<proto::OwnedDataTrackStream, SubscribeDataTrackError> subscribeDataTrackFailure(SubscribeDataTrackErrorCode code,
const std::string& message) {
LK_LOG_WARN("Subscribe data track failed: code={} message={}", static_cast<std::uint32_t>(code), message);
return Result<proto::OwnedDataTrackStream, SubscribeDataTrackError>::failure(SubscribeDataTrackError{code, message});
}

std::optional<FfiClient::AsyncId> ExtractAsyncId(const proto::FfiEvent& event) {
using E = proto::FfiEvent;
switch (event.message_case()) {
Expand Down Expand Up @@ -651,18 +657,17 @@ Result<proto::OwnedDataTrackStream, SubscribeDataTrackError> FfiClient::subscrib
try {
const proto::FfiResponse resp = sendRequest(req);
if (!resp.has_subscribe_data_track()) {
return Result<proto::OwnedDataTrackStream, SubscribeDataTrackError>::failure(SubscribeDataTrackError{
SubscribeDataTrackErrorCode::PROTOCOL_ERROR, "FfiResponse missing subscribe_data_track"});
return subscribeDataTrackFailure(SubscribeDataTrackErrorCode::PROTOCOL_ERROR,
"FfiResponse missing subscribe_data_track");
}
if (!resp.subscribe_data_track().has_stream()) {
return Result<proto::OwnedDataTrackStream, SubscribeDataTrackError>::failure(SubscribeDataTrackError{
SubscribeDataTrackErrorCode::PROTOCOL_ERROR, "FfiResponse subscribe_data_track missing stream"});
return subscribeDataTrackFailure(SubscribeDataTrackErrorCode::PROTOCOL_ERROR,
"FfiResponse subscribe_data_track missing stream");
}
proto::OwnedDataTrackStream sub = resp.subscribe_data_track().stream();
return Result<proto::OwnedDataTrackStream, SubscribeDataTrackError>::success(std::move(sub));
} catch (const std::exception& e) { // NOLINT(bugprone-empty-catch)
return Result<proto::OwnedDataTrackStream, SubscribeDataTrackError>::failure(
SubscribeDataTrackError{SubscribeDataTrackErrorCode::INTERNAL, e.what()});
return subscribeDataTrackFailure(SubscribeDataTrackErrorCode::INTERNAL, e.what());
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/remote_data_track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "data_track.pb.h"
#include "ffi.pb.h"
#include "ffi_client.h"
#include "lk_log.h"

namespace livekit {

Expand Down Expand Up @@ -48,6 +49,9 @@ bool RemoteDataTrack::isPublished() const {
Result<std::shared_ptr<DataTrackStream>, SubscribeDataTrackError> RemoteDataTrack::subscribe(
const DataTrackStream::Options& options) {
if (!handle_.valid()) {
LK_LOG_WARN("Subscribe data track failed: code={} message={}",
static_cast<std::uint32_t>(SubscribeDataTrackErrorCode::INVALID_HANDLE),
"RemoteDataTrack::subscribe: invalid FFI handle");
return Result<std::shared_ptr<DataTrackStream>, SubscribeDataTrackError>::failure(
SubscribeDataTrackError{SubscribeDataTrackErrorCode::INVALID_HANDLE,
"RemoteDataTrack::subscribe: invalid FFI "
Expand Down
7 changes: 7 additions & 0 deletions src/subscription_thread_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,13 @@ std::thread SubscriptionThreadDispatcher::startDataReaderLocked(DataFrameCallbac
LK_LOG_ERROR("Data frame callback exception: {}", e.what());
}
}
const auto error = stream->terminalError();
if (error.has_value()) {
LK_LOG_ERROR(
"Data reader stream ended with subscription error for \"{}\" from "
"\"{}\": code={} message={}",
track_name, identity, static_cast<std::uint32_t>(error->code), error->message);
}
LK_LOG_INFO("Data reader thread exiting for \"{}\" track=\"{}\"", identity, track_name);
});
// NOLINTEND(bugprone-lambda-function-name)
Expand Down
2 changes: 1 addition & 1 deletion src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ if(UNIT_TEST_SOURCES)
PRIVATE
livekit
spdlog::spdlog
$<$<PLATFORM_ID:Windows>:${LIVEKIT_PROTOBUF_TARGET}>
${LIVEKIT_PROTOBUF_TARGET}
Comment thread
stephen-derosa marked this conversation as resolved.
GTest::gtest_main
)

Expand Down
54 changes: 54 additions & 0 deletions src/tests/integration/test_data_track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,60 @@ TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) {
<< "Remote track did not report unpublished state";
}

TEST_F(DataTrackE2ETest, SubscribeAfterUnpublishReportsTerminalError) {
const auto track_name = makeTrackName("subscribe_after_unpublish");

DataTrackPublishedDelegate subscriber_delegate;
std::vector<TestRoomConnectionOptions> room_configs(2);
room_configs[1].delegate = &subscriber_delegate;

auto rooms = testRooms(room_configs);
auto& publisher_room = rooms[0];

auto local_track = requirePublishedTrack(publisher_room->localParticipant(), track_name);
ASSERT_TRUE(local_track->isPublished());

auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout);
ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track";
ASSERT_TRUE(remote_track->isPublished());

local_track->unpublishDataTrack();
ASSERT_FALSE(local_track->isPublished());
ASSERT_TRUE(waitForCondition([&]() { return !remote_track->isPublished(); }, 2s))
<< "Remote track did not report unpublished state";

auto subscribe_result = remote_track->subscribe();
if (!subscribe_result) {
FAIL() << "Expected subscribe to return a stream before terminal EOS: "
<< describeDataTrackError(subscribe_result.error());
}
auto subscription = subscribe_result.value();

std::promise<bool> read_promise;
auto read_future = read_promise.get_future();
std::thread reader([subscription, promise = std::move(read_promise)]() mutable {
DataTrackFrame frame;
promise.set_value(subscription->read(frame));
});

const auto read_status = read_future.wait_for(5s);
if (read_status != std::future_status::ready) {
subscription->close();
}
reader.join();

// TODO(BOT-347): this sometimes fails with a timeout.
ASSERT_EQ(read_status, std::future_status::ready) << "Timed out waiting for terminal data-track EOS";
EXPECT_FALSE(read_future.get()) << "Unpublished track subscription unexpectedly delivered a frame";

const auto terminal_error = subscription->terminalError();
ASSERT_TRUE(terminal_error.has_value()) << "Expected terminal subscribe error on EOS";
// EXPECT_EQ(terminal_error->code, SubscribeDataTrackErrorCode::UNPUBLISHED);
// should this actually be internal?
EXPECT_EQ(terminal_error->code, SubscribeDataTrackErrorCode::INTERNAL);
EXPECT_FALSE(terminal_error->message.empty());
}

TEST_F(DataTrackE2ETest, PublishManyTracks) {
auto rooms = testRooms(1);
auto& room = rooms[0];
Expand Down
Loading
Loading