Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ set(ICEBERG_SOURCES
update/fast_append.cc
update/merging_snapshot_update.cc
update/pending_update.cc
update/row_delta.cc
update/set_snapshot.cc
update/snapshot_manager.cc
update/snapshot_update.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ iceberg_sources = files(
'update/fast_append.cc',
'update/merging_snapshot_update.cc',
'update/pending_update.cc',
'update/row_delta.cc',
'update/set_snapshot.cc',
'update/snapshot_manager.cc',
'update/snapshot_update.cc',
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "iceberg/transaction.h"
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/fast_append.h"
#include "iceberg/update/row_delta.h"
#include "iceberg/update/set_snapshot.h"
#include "iceberg/update/snapshot_manager.h"
#include "iceberg/update/update_location.h"
Expand Down Expand Up @@ -217,6 +218,12 @@ Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
return FastAppend::Make(name().name, std::move(ctx));
}

Result<std::shared_ptr<RowDelta>> Table::NewRowDelta() {
ICEBERG_ASSIGN_OR_RAISE(
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
return RowDelta::Make(name().name, std::move(ctx));
}

Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate));
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// \brief Create a new FastAppend to append data files and commit the changes.
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();

/// \brief Create a new RowDelta to add rows and row-level deletes.
virtual Result<std::shared_ptr<RowDelta>> NewRowDelta();

/// \brief Create a new SnapshotManager to manage snapshots and snapshot references.
virtual Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();

Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ if(ICEBERG_BUILD_BUNDLE)
manifest_filter_manager_test.cc
merging_snapshot_update_test.cc
name_mapping_update_test.cc
row_delta_test.cc
snapshot_manager_test.cc
transaction_test.cc
update_location_test.cc
Expand Down
287 changes: 287 additions & 0 deletions src/iceberg/test/row_delta_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/row_delta.h"

#include <memory>
#include <optional>
#include <string>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/avro/avro_register.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/update/fast_append.h"

namespace iceberg {

class RowDeltaTest : public MinimalUpdateTestBase {
protected:
static void SetUpTestSuite() { avro::RegisterAll(); }

void SetUp() override {
MinimalUpdateTestBase::SetUp();

ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());

file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L);
file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L);
}

std::shared_ptr<DataFile> MakeDataFile(const std::string& path, int64_t partition_x) {
auto file = std::make_shared<DataFile>();
file->content = DataFile::Content::kData;
file->file_path = table_location_ + path;
file->file_format = FileFormatType::kParquet;
file->partition = PartitionValues(std::vector<Literal>{Literal::Long(partition_x)});
file->file_size_in_bytes = 1024;
file->record_count = 100;
file->partition_spec_id = spec_->spec_id();
return file;
}

std::shared_ptr<DataFile> MakeDeleteFile(const std::string& path, int64_t partition_x) {
auto file = MakeDataFile(path, partition_x);
file->content = DataFile::Content::kPositionDeletes;
file->file_size_in_bytes = 256;
file->record_count = 7;
return file;
}

std::shared_ptr<DataFile> MakeDeletionVector(const std::string& path,
const std::string& referenced_data_file,
int64_t partition_x,
int64_t content_offset = 0) {
auto file = MakeDeleteFile(path, partition_x);
file->file_format = FileFormatType::kPuffin;
file->referenced_data_file = referenced_data_file;
file->content_offset = content_offset;
file->content_size_in_bytes = 10;
return file;
}

void CommitFileA() {
ICEBERG_UNWRAP_OR_FAIL(auto fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
EXPECT_THAT(fast_append->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
}

void SetTableFormatVersion(int8_t format_version) {
table_->metadata()->format_version = format_version;
}

std::shared_ptr<PartitionSpec> spec_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<DataFile> file_a_;
std::shared_ptr<DataFile> file_b_;
};

TEST_F(RowDeltaTest, AddRowsCommitsAppendOperation) {
std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->AddRows(file_a_);

EXPECT_THAT(row_delta->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kAppend));
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedRecords), "100");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedFileSize), "1024");
}

TEST_F(RowDeltaTest, AddDeletesCommitsDeleteOperation) {
auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet",
/*partition_x=*/1L);

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->AddDeletes(delete_file);

EXPECT_THAT(row_delta->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete));
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeleteFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeletes), "7");
}

TEST_F(RowDeltaTest, RemoveRowsCommitsOverwriteOperation) {
CommitFileA();

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->RemoveRows(file_a_);

EXPECT_THAT(row_delta->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kOverwrite));
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedRecords), "100");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedFileSize), "1024");
}

TEST_F(RowDeltaTest, RemoveRowsAndAddDeletesCommitsDeleteOperation) {
CommitFileA();

auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet",
/*partition_x=*/1L);

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->RemoveRows(file_a_);
row_delta->AddDeletes(delete_file);

EXPECT_THAT(row_delta->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete));
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1");
}

TEST_F(RowDeltaTest, AddRowsAndRemoveDeletesCommitsAppendOperation) {
auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet",
/*partition_x=*/1L);
{
std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->AddDeletes(delete_file);
EXPECT_THAT(row_delta->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
}

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->AddRows(file_a_);
row_delta->RemoveDeletes(delete_file);

EXPECT_THAT(row_delta->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kAppend));
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles), "1");
}

TEST_F(RowDeltaTest, CannotRemoveReferencedDataFile) {
CommitFileA();

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
std::vector<std::string> referenced_files{file_a_->file_path};
row_delta->ValidateDataFilesExist(referenced_files);
row_delta->RemoveRows(file_a_);

auto result = row_delta->Commit();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(result, HasErrorMessage("Cannot delete data files"));
EXPECT_THAT(result, HasErrorMessage(file_a_->file_path));
}

TEST_F(RowDeltaTest, AddDeleteFileForRemovedDataFileCommitsDeleteOperation) {
CommitFileA();

auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet",
/*partition_x=*/1L);
delete_file->referenced_data_file = file_a_->file_path;

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->RemoveRows(file_a_);
row_delta->AddDeletes(delete_file);

EXPECT_THAT(row_delta->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete));
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1");
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1");
}

TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingRowsOnEmptyTable) {
std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->ValidateDeletedFiles();
row_delta->RemoveRows(file_a_);

EXPECT_THAT(row_delta->Commit(), IsOk());
}

TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingDeletesOnEmptyTable) {
auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet",
/*partition_x=*/1L);

std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->ValidateDeletedFiles();
row_delta->RemoveDeletes(delete_file);

EXPECT_THAT(row_delta->Commit(), IsOk());
}

TEST_F(RowDeltaTest, AddDeletionVectorValidatesConcurrentDVs) {
CommitFileA();
ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot());
SetTableFormatVersion(3);

auto concurrent_dv =
MakeDeletionVector("/delete/concurrent-dv-a.puffin", file_a_->file_path,
/*partition_x=*/1L, /*content_offset=*/0);
std::shared_ptr<RowDelta> concurrent_delta;
ICEBERG_UNWRAP_OR_FAIL(concurrent_delta, table_->NewRowDelta());
concurrent_delta->AddDeletes(concurrent_dv);
EXPECT_THAT(concurrent_delta->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
SetTableFormatVersion(3);

auto dv = MakeDeletionVector("/delete/dv-a.puffin", file_a_->file_path,
/*partition_x=*/1L, /*content_offset=*/10);
std::shared_ptr<RowDelta> row_delta;
ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta());
row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id);
row_delta->AddDeletes(dv);

auto result = row_delta->Commit();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(result, HasErrorMessage("Found concurrently added DV"));
EXPECT_THAT(result, HasErrorMessage(file_a_->file_path));
}

} // namespace iceberg
8 changes: 8 additions & 0 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/fast_append.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/row_delta.h"
#include "iceberg/update/set_snapshot.h"
#include "iceberg/update/snapshot_manager.h"
#include "iceberg/update/snapshot_update.h"
Expand Down Expand Up @@ -478,6 +479,13 @@ Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
return fast_append;
}

Result<std::shared_ptr<RowDelta>> Transaction::NewRowDelta() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<RowDelta> row_delta,
RowDelta::Make(ctx_->table->name().name, ctx_));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(row_delta));
return row_delta;
}

Result<std::shared_ptr<UpdateStatistics>> Transaction::NewUpdateStatistics() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateStatistics> update_statistics,
UpdateStatistics::Make(ctx_));
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
/// \brief Create a new FastAppend to append data files and commit the changes.
Result<std::shared_ptr<FastAppend>> NewFastAppend();

/// \brief Create a new RowDelta to add rows and row-level deletes.
Result<std::shared_ptr<RowDelta>> NewRowDelta();

/// \brief Create a new SnapshotManager to manage snapshots.
Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();

Expand Down
1 change: 1 addition & 0 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class TransactionContext;
class ExpireSnapshots;
class FastAppend;
class PendingUpdate;
class RowDelta;
class SetSnapshot;
class SnapshotManager;
class SnapshotUpdate;
Expand Down
Loading
Loading