diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index 03642b0e5547..39c2011bf486 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -53,7 +53,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import java.nio.file.Files; @@ -357,7 +356,6 @@ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws Exception @Test @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") - @DisabledIfSystemProperty(named = "python.version", matches = "3.6") public void testReadPkTable() throws Exception { Identifier identifier = identifier("mixed_test_pk_tablep_parquet"); Table table = catalog.getTable(identifier); diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 7a194574f5e5..661c5799f7d9 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -105,11 +105,6 @@ def test_read_append_table(self): @parameterized.expand(get_file_format_params()) def test_py_write_read_pk_table(self, file_format): - if sys.version_info[:2] == (3, 6): - self.skipTest( - "Skipping on Python 3.6 due to PyArrow compatibility issue (RecordBatch.add_column not available). " - "Will be fixed in next PR." - ) pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index 5e82369b6c97..3165a01069c0 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -36,26 +36,34 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch: """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND.""" num_rows = data.num_rows - enhanced_table = data - - for pk_key in reversed(self.trimmed_primary_keys): - if pk_key in data.column_names: + + new_arrays = [] + new_names = [] + + for pk_key in self.trimmed_primary_keys: + if pk_key in data.schema.names: key_column = data.column(pk_key) - enhanced_table = enhanced_table.add_column(0, f'_KEY_{pk_key}', key_column) - + new_arrays.append(key_column) + new_names.append(f'_KEY_{pk_key}') + sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) - enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys), '_SEQUENCE_NUMBER', sequence_column) - + new_arrays.append(sequence_column) + new_names.append('_SEQUENCE_NUMBER') + # TODO: support real row kind here value_kind_column = pa.array([0] * num_rows, type=pa.int8()) - enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys) + 1, '_VALUE_KIND', - value_kind_column) - - return enhanced_table + new_arrays.append(value_kind_column) + new_names.append('_VALUE_KIND') + + for i in range(data.num_columns): + new_arrays.append(data.column(i)) + new_names.append(data.schema.names[i]) + + return pa.RecordBatch.from_arrays(new_arrays, names=new_names) def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch: sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys] - if '_SEQUENCE_NUMBER' in data.column_names: + if '_SEQUENCE_NUMBER' in data.schema.names: sort_keys.append(('_SEQUENCE_NUMBER', 'ascending')) sorted_indices = pc.sort_indices(data, sort_keys=sort_keys)