From 072b0f211833098d729a7ea535297b2e1f9c3331 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 13 Jan 2026 19:46:12 +0800 Subject: [PATCH 1/2] [server] When applying projection pushdown, return empty records instead of skipping them to ensure offset movement in the client. --- .../admin/ClientToServerITCaseBase.java | 2 +- .../fluss/client/table/FlussTableITCase.java | 72 +++++++++++++- .../fluss/record/FileLogProjection.java | 6 +- .../apache/fluss/server/kv/KvTabletTest.java | 96 ++++++++++--------- 4 files changed, 126 insertions(+), 50 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java index 311d5a94ff..940b0388f6 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java @@ -110,7 +110,7 @@ protected long createTable( return admin.getTableInfo(tablePath).get().getTableId(); } - private static Configuration initConfig() { + protected static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); // set a shorter interval for testing purpose diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index a65529648f..72655ef61f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.admin.ClientToServerITCaseBase; import org.apache.fluss.client.lookup.LookupResult; import org.apache.fluss.client.lookup.Lookuper; @@ -35,6 +36,7 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.TestFileSystem; import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.MergeEngineType; @@ -49,6 +51,7 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.indexed.IndexedRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; @@ -71,6 +74,7 @@ import java.util.concurrent.CompletableFuture; import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows; +import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; @@ -1160,6 +1164,26 @@ void testInvalidColumnProjection() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testFirstRowMergeEngine(boolean doProjection) throws Exception { + Configuration conf = initConfig(); + // To better mock the issue https://github.com/apache/fluss/issues/2369: + // 1. disable remote log task so that won't read remote log. + // 2. Set LOG_SEGMENT_FILE_SIZE to make sure one segment before last segment is contain + // empty batch at the end. + // In this way, if skip empty batch, the read will in stuck forever. + conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO); + conf.set( + ConfigOptions.LOG_SEGMENT_FILE_SIZE, + new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); + conf.set( + ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, + new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); + final FlussClusterExtension flussClusterExtension = + FlussClusterExtension.builder() + .setNumOfTabletServers(3) + .setClusterConf(conf) + .build(); + flussClusterExtension.start(); + TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA_PK) @@ -1171,13 +1195,22 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { "test_first_row_merge_engine_with_%s", doProjection ? "projection" : "no_projection"); TablePath tablePath = TablePath.of("test_db_1", tableName); - createTable(tablePath, tableDescriptor, false); int rows = 5; int duplicateNum = 10; int batchSize = 3; int count = 0; - try (Table table = conn.getTable(tablePath)) { + // Case1: Test normal update to generator not empty cdc logs. + Table table = null; + LogScanner logScanner = null; + try (Connection connection = + ConnectionFactory.createConnection( + flussClusterExtension.getClientConfig()); + Admin admin = connection.getAdmin()) { + admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, false) + .get(); + admin.createTable(tablePath, tableDescriptor, false).get(); + table = connection.getTable(tablePath); // first, put rows UpsertWriter upsertWriter = table.newUpsert().createWriter(); List expectedScanRows = new ArrayList<>(rows); @@ -1208,7 +1241,7 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { if (doProjection) { scan = scan.project(new int[] {0}); // do projection. } - LogScanner logScanner = scan.createLogScanner(); + logScanner = scan.createLogScanner(); logScanner.subscribeFromBeginning(0); List actualLogRecords = new ArrayList<>(0); @@ -1216,7 +1249,6 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } - logScanner.close(); assertThat(actualLogRecords).hasSize(rows); for (int i = 0; i < actualLogRecords.size(); i++) { ScanRecord scanRecord = actualLogRecords.get(i); @@ -1225,6 +1257,38 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType) .isEqualTo(expectedScanRows.get(i)); } + + // Case2: Test all the update in the write batch are duplicate(Thus generate empty cdc + // logs). + // insert duplicate rows again to generate empty cdc log. + for (int num = 0; num < duplicateNum; num++) { + upsertWriter.upsert(row(0, "value_" + num)); + upsertWriter.flush(); + } + + // insert a new row. + upsertWriter.upsert(row(rows + 1, "new_value")); + + actualLogRecords = new ArrayList<>(0); + while (actualLogRecords.isEmpty()) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + logScanner.close(); + assertThat(actualLogRecords).hasSize(1); + ScanRecord scanRecord = actualLogRecords.get(0); + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); + assertThatRow(scanRecord.getRow()) + .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType) + .isEqualTo(doProjection ? row(rows + 1) : row(rows + 1, "new_value")); + } finally { + if (logScanner != null) { + logScanner.close(); + } + if (table != null) { + table.close(); + } + flussClusterExtension.close(); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java index 4717271c24..7a4cfe55d4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java @@ -140,7 +140,7 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int // So we use V0 header size here for a conservative judgment. In the end, the condition // of (position >= end - recordBatchHeaderSize) will ensure the final correctness. while (maxBytes > V0_RECORD_BATCH_HEADER_SIZE) { - if (position >= end - V0_RECORD_BATCH_HEADER_SIZE) { + if (position > end - V0_RECORD_BATCH_HEADER_SIZE) { // the remaining bytes in the file are not enough to read a batch header up to // magic. return new BytesViewLogRecords(builder.build()); @@ -166,10 +166,12 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int return new BytesViewLogRecords(builder.build()); } - // Skip empty batch. The empty batch was generated when build cdc log batch when there + // Return empty batch to push forward log offset. The empty batch was generated when + // build cdc log batch when there // is no cdc log generated for this kv batch. See the comments about the field // 'lastOffsetDelta' in DefaultLogRecordBatch. if (batchSizeInBytes == recordBatchHeaderSize) { + builder.addBytes(channel, position, batchSizeInBytes); position += batchSizeInBytes; continue; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 52b1080ed1..2f19328f4b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -702,26 +702,20 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { kvTablet.putAsLeader(kvRecordBatch2, null); endOffset = logTablet.localLogEndOffset(); assertThat(endOffset).isEqualTo(offsetBefore + i + 1); + MemoryLogRecords emptyLogs = + logRecords( + readLogRowType, + offsetBefore + i, + Collections.emptyList(), + Collections.emptyList()); + MultiBytesView bytesView = + MultiBytesView.builder() + .addBytes( + expectedLogs.getMemorySegment(), 0, expectedLogs.sizeInBytes()) + .addBytes(emptyLogs.getMemorySegment(), 0, emptyLogs.sizeInBytes()) + .build(); + expectedLogs = MemoryLogRecords.pointToBytesView(bytesView); - // the empty batch will be read if no projection, - // the empty batch will not be read if has projection - if (!doProjection) { - MemoryLogRecords emptyLogs = - logRecords( - readLogRowType, - offsetBefore + i, - Collections.emptyList(), - Collections.emptyList()); - MultiBytesView bytesView = - MultiBytesView.builder() - .addBytes( - expectedLogs.getMemorySegment(), - 0, - expectedLogs.sizeInBytes()) - .addBytes(emptyLogs.getMemorySegment(), 0, emptyLogs.sizeInBytes()) - .build(); - expectedLogs = MemoryLogRecords.pointToBytesView(bytesView); - } actualLogRecords = readLogRecords(logTablet, 0, logProjection); assertThatLogRecords(actualLogRecords) .withSchema(readLogRowType) @@ -898,31 +892,25 @@ void testVersionRowMergeEngine(boolean doProjection) throws Exception { endOffset = logTablet.localLogEndOffset(); assertThat(endOffset).isEqualTo(offsetBefore + i + 1); - // the empty batch will be read if no projection, - // the empty batch will not be read if has projection - if (!doProjection) { - MemoryLogRecords emptyLogs = - logRecords( - readLogRowType, - offsetBefore + i, - Collections.emptyList(), - Collections.emptyList()); - MultiBytesView bytesView = - MultiBytesView.builder() - .addBytes( - expectedLogs.getMemorySegment(), - 0, - expectedLogs.sizeInBytes()) - .addBytes(emptyLogs.getMemorySegment(), 0, emptyLogs.sizeInBytes()) - .build(); - expectedLogs = MemoryLogRecords.pointToBytesView(bytesView); - } - actualLogRecords = readLogRecords(logTablet, 0, logProjection); - assertThatLogRecords(actualLogRecords) - .withSchema(readLogRowType) - .assertCheckSum(!doProjection) - .isEqualTo(expectedLogs); + MemoryLogRecords emptyLogs = + logRecords( + readLogRowType, + offsetBefore + i, + Collections.emptyList(), + Collections.emptyList()); + MultiBytesView bytesView = + MultiBytesView.builder() + .addBytes( + expectedLogs.getMemorySegment(), 0, expectedLogs.sizeInBytes()) + .addBytes(emptyLogs.getMemorySegment(), 0, emptyLogs.sizeInBytes()) + .build(); + expectedLogs = MemoryLogRecords.pointToBytesView(bytesView); } + actualLogRecords = readLogRecords(logTablet, 0, logProjection); + assertThatLogRecords(actualLogRecords) + .withSchema(readLogRowType) + .assertCheckSum(!doProjection) + .isEqualTo(expectedLogs); List kvData3 = Arrays.asList( @@ -1333,6 +1321,28 @@ private MemoryLogRecords logRecords( DEFAULT_COMPRESSION); } + private MemoryLogRecords logRecords( + RowType rowType, + long baseOffset, + List changeTypes, + List values, + short schemaId, + int magicValue) + throws Exception { + return createBasicMemoryLogRecords( + rowType, + schemaId, + baseOffset, + -1L, + CURRENT_LOG_MAGIC_VALUE, + NO_WRITER_ID, + NO_BATCH_SEQUENCE, + changeTypes, + values, + LogFormat.ARROW, + DEFAULT_COMPRESSION); + } + private void checkEqual( LogRecords actaulLogRecords, List expectedLogs, RowType rowType) { LogTestBase.assertLogRecordsListEquals(expectedLogs, actaulLogRecords, rowType); From 71031534f9d5e1510e9d891773c5c5b260ff4b59 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 22 Jan 2026 11:01:58 +0800 Subject: [PATCH 2/2] modified base on cr --- .../admin/ClientToServerITCaseBase.java | 2 +- .../admin/CustomFlussClusterITCase.java | 204 ++++++++++++++++++ .../fluss/client/table/FlussTableITCase.java | 72 +------ .../apache/fluss/server/kv/KvTabletTest.java | 22 -- 4 files changed, 209 insertions(+), 91 deletions(-) create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java index 940b0388f6..311d5a94ff 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java @@ -110,7 +110,7 @@ protected long createTable( return admin.getTableInfo(tablePath).get().getTableId(); } - protected static Configuration initConfig() { + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); // set a shorter interval for testing purpose diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java new file mode 100644 index 0000000000..28b725595b --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/CustomFlussClusterITCase.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.lookup.Lookuper; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.DatabaseDescriptor; +import org.apache.fluss.metadata.MergeEngineType; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for tests that require manual cluster management. */ +class CustomFlussClusterITCase { + + @Test + void testProjectionPushdownWithEmptyBatches() throws Exception { + Configuration conf = initConfig(); + // Configuration to reproduce the issue described in + // https://github.com/apache/fluss/issues/2369: + // 1. Disable remote log task to prevent reading from remote log storage. + // 2. Set LOG_SEGMENT_FILE_SIZE to ensure that the segment before the last segment + // contains an empty batch at the end. + // This setup causes the scanner to block indefinitely if it incorrectly skips empty batches + // during projection pushdown, as it will wait forever for non-empty data that never + // arrives. + conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO); + conf.set( + ConfigOptions.LOG_SEGMENT_FILE_SIZE, + new MemorySize(2 * V0_RECORD_BATCH_HEADER_SIZE)); + conf.set( + ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, + new MemorySize(2 * V0_RECORD_BATCH_HEADER_SIZE)); + final FlussClusterExtension flussClusterExtension = + FlussClusterExtension.builder() + .setNumOfTabletServers(3) + .setClusterConf(conf) + .build(); + flussClusterExtension.start(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA_PK) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngineType.FIRST_ROW) + .distributedBy(1) + .build(); + RowType rowType = DATA1_SCHEMA_PK.getRowType(); + TablePath tablePath = + TablePath.of("test_db_1", "test_projection_pushdown_with_empty_batches"); + + int rows = 5; + int duplicateNum = 2; + int batchSize = 3; + int count = 0; + // Case1: Test normal update to generator not empty cdc logs. + Table table = null; + LogScanner logScanner = null; + try (Connection connection = + ConnectionFactory.createConnection( + flussClusterExtension.getClientConfig()); + Admin admin = connection.getAdmin()) { + admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, false) + .get(); + admin.createTable(tablePath, tableDescriptor, false).get(); + table = connection.getTable(tablePath); + // first, put rows + UpsertWriter upsertWriter = table.newUpsert().createWriter(); + List expectedScanRows = new ArrayList<>(rows); + List expectedLookupRows = new ArrayList<>(rows); + for (int id = 0; id < rows; id++) { + for (int num = 0; num < duplicateNum; num++) { + upsertWriter.upsert(row(id, "value_" + num)); + if (count++ > batchSize) { + upsertWriter.flush(); + count = 0; + } + } + + expectedLookupRows.add(row(id, "value_0")); + expectedScanRows.add(row(id)); + } + + upsertWriter.flush(); + + Lookuper lookuper = table.newLookup().createLookuper(); + // now, get rows by lookup + for (int id = 0; id < rows; id++) { + InternalRow gotRow = lookuper.lookup(row(id)).get().getSingletonRow(); + assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedLookupRows.get(id)); + } + + Scan scan = table.newScan().project(new int[] {0}); + logScanner = scan.createLogScanner(); + + logScanner.subscribeFromBeginning(0); + List actualLogRecords = new ArrayList<>(0); + while (actualLogRecords.size() < rows) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + assertThat(actualLogRecords).hasSize(rows); + for (int i = 0; i < actualLogRecords.size(); i++) { + ScanRecord scanRecord = actualLogRecords.get(i); + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); + assertThatRow(scanRecord.getRow()) + .withSchema(rowType.project(new int[] {0})) + .isEqualTo(expectedScanRows.get(i)); + } + + // Case2: Test all the update in the write batch are duplicate(Thus generate empty cdc + // logs). + // insert duplicate rows again to generate empty cdc log. + for (int num = 0; num < duplicateNum; num++) { + upsertWriter.upsert(row(0, "value_" + num)); + upsertWriter.flush(); + } + + // insert a new row. + upsertWriter.upsert(row(rows + 1, "new_value")); + + actualLogRecords = new ArrayList<>(0); + while (actualLogRecords.isEmpty()) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + logScanner.close(); + assertThat(actualLogRecords).hasSize(1); + ScanRecord scanRecord = actualLogRecords.get(0); + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); + assertThatRow(scanRecord.getRow()) + .withSchema(rowType.project(new int[] {0})) + .isEqualTo(row(rows + 1)); + } finally { + if (logScanner != null) { + logScanner.close(); + } + if (table != null) { + table.close(); + } + flussClusterExtension.close(); + } + } + + protected static Configuration initConfig() { + Configuration conf = new Configuration(); + conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); + // set a shorter interval for testing purpose + conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)); + // set default datalake format for the cluster and enable datalake tables + conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON); + + conf.setString("datalake.paimon.jdbc.user", "admin"); + conf.setString("datalake.paimon.jdbc.password", "pass"); + + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb")); + conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb")); + conf.set(ConfigOptions.MAX_PARTITION_NUM, 10); + conf.set(ConfigOptions.MAX_BUCKET_NUM, 30); + + conf.set(ConfigOptions.NETTY_CLIENT_NUM_NETWORK_THREADS, 1); + return conf; + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 72655ef61f..a65529648f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -19,7 +19,6 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; -import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.admin.ClientToServerITCaseBase; import org.apache.fluss.client.lookup.LookupResult; import org.apache.fluss.client.lookup.Lookuper; @@ -36,7 +35,6 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.TestFileSystem; import org.apache.fluss.metadata.DataLakeFormat; -import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.MergeEngineType; @@ -51,7 +49,6 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.indexed.IndexedRow; -import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; @@ -74,7 +71,6 @@ import java.util.concurrent.CompletableFuture; import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows; -import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; @@ -1164,26 +1160,6 @@ void testInvalidColumnProjection() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testFirstRowMergeEngine(boolean doProjection) throws Exception { - Configuration conf = initConfig(); - // To better mock the issue https://github.com/apache/fluss/issues/2369: - // 1. disable remote log task so that won't read remote log. - // 2. Set LOG_SEGMENT_FILE_SIZE to make sure one segment before last segment is contain - // empty batch at the end. - // In this way, if skip empty batch, the read will in stuck forever. - conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO); - conf.set( - ConfigOptions.LOG_SEGMENT_FILE_SIZE, - new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); - conf.set( - ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, - new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); - final FlussClusterExtension flussClusterExtension = - FlussClusterExtension.builder() - .setNumOfTabletServers(3) - .setClusterConf(conf) - .build(); - flussClusterExtension.start(); - TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA_PK) @@ -1195,22 +1171,13 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { "test_first_row_merge_engine_with_%s", doProjection ? "projection" : "no_projection"); TablePath tablePath = TablePath.of("test_db_1", tableName); + createTable(tablePath, tableDescriptor, false); int rows = 5; int duplicateNum = 10; int batchSize = 3; int count = 0; - // Case1: Test normal update to generator not empty cdc logs. - Table table = null; - LogScanner logScanner = null; - try (Connection connection = - ConnectionFactory.createConnection( - flussClusterExtension.getClientConfig()); - Admin admin = connection.getAdmin()) { - admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, false) - .get(); - admin.createTable(tablePath, tableDescriptor, false).get(); - table = connection.getTable(tablePath); + try (Table table = conn.getTable(tablePath)) { // first, put rows UpsertWriter upsertWriter = table.newUpsert().createWriter(); List expectedScanRows = new ArrayList<>(rows); @@ -1241,7 +1208,7 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { if (doProjection) { scan = scan.project(new int[] {0}); // do projection. } - logScanner = scan.createLogScanner(); + LogScanner logScanner = scan.createLogScanner(); logScanner.subscribeFromBeginning(0); List actualLogRecords = new ArrayList<>(0); @@ -1249,6 +1216,7 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } + logScanner.close(); assertThat(actualLogRecords).hasSize(rows); for (int i = 0; i < actualLogRecords.size(); i++) { ScanRecord scanRecord = actualLogRecords.get(i); @@ -1257,38 +1225,6 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType) .isEqualTo(expectedScanRows.get(i)); } - - // Case2: Test all the update in the write batch are duplicate(Thus generate empty cdc - // logs). - // insert duplicate rows again to generate empty cdc log. - for (int num = 0; num < duplicateNum; num++) { - upsertWriter.upsert(row(0, "value_" + num)); - upsertWriter.flush(); - } - - // insert a new row. - upsertWriter.upsert(row(rows + 1, "new_value")); - - actualLogRecords = new ArrayList<>(0); - while (actualLogRecords.isEmpty()) { - ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); - scanRecords.forEach(actualLogRecords::add); - } - logScanner.close(); - assertThat(actualLogRecords).hasSize(1); - ScanRecord scanRecord = actualLogRecords.get(0); - assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.INSERT); - assertThatRow(scanRecord.getRow()) - .withSchema(doProjection ? rowType.project(new int[] {0}) : rowType) - .isEqualTo(doProjection ? row(rows + 1) : row(rows + 1, "new_value")); - } finally { - if (logScanner != null) { - logScanner.close(); - } - if (table != null) { - table.close(); - } - flussClusterExtension.close(); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 2f19328f4b..e2f0503b6c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -1321,28 +1321,6 @@ private MemoryLogRecords logRecords( DEFAULT_COMPRESSION); } - private MemoryLogRecords logRecords( - RowType rowType, - long baseOffset, - List changeTypes, - List values, - short schemaId, - int magicValue) - throws Exception { - return createBasicMemoryLogRecords( - rowType, - schemaId, - baseOffset, - -1L, - CURRENT_LOG_MAGIC_VALUE, - NO_WRITER_ID, - NO_BATCH_SEQUENCE, - changeTypes, - values, - LogFormat.ARROW, - DEFAULT_COMPRESSION); - } - private void checkEqual( LogRecords actaulLogRecords, List expectedLogs, RowType rowType) { LogTestBase.assertLogRecordsListEquals(expectedLogs, actaulLogRecords, rowType);