[lake/iceberg] Support MAP type in Iceberg tables#2367
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request adds support for MAP data types in Iceberg tables for Fluss, addressing issue #2258. The implementation enables conversion of Fluss MAP types to Iceberg MapType and provides runtime support for reading and writing map data.
Changes:
- Implemented MAP type conversion from Fluss to Iceberg format in
FlussDataTypeToIcebergDataType - Added
FlussMapAsIcebergMapadapter class for converting Fluss InternalMap to Java Map for Iceberg - Enhanced
FlussArrayAsIcebergListandFlussRowAsIcebergRecordto support nested MAP types - Added FLOAT and DOUBLE support to
IcebergBinaryRowWriter.createFieldWriter - Added explicit error handling for MAP and ARRAY types in bucket key writer
- Comprehensive test coverage for MAP type conversions, including nested structures
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
FlussDataTypeToIcebergDataType.java |
Replaced UnsupportedOperationException with actual MAP type conversion logic, handling field ID allocation |
FlussMapAsIcebergMap.java |
New adapter class for converting Fluss InternalMap to Java Map, with support for all scalar types and nested collections |
FlussRowAsIcebergRecord.java |
Added MAP type handling in field converter creation |
FlussArrayAsIcebergList.java |
Added support for MAP elements within arrays |
IcebergBinaryRowWriter.java |
Added FLOAT/DOUBLE type support and explicit error messages for non-scalar bucket key types |
FlussDataTypeToIcebergDataTypeMapTest.java |
New comprehensive test suite covering MAP type conversions with various key-value type combinations |
FlussRowAsIcebergRecordTest.java |
Added integration tests for MAP data conversion including nested structures |
IcebergBinaryRowWriterTest.java |
Added tests to verify MAP and ARRAY types are properly rejected as bucket keys |
Comments suppressed due to low confidence (1)
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java:118
- The get method is missing support for RowType, which is a supported nested type in Arrays. When an Array contains Row values (e.g., Array<Row<id: Int, name: String>>), this method will throw an UnsupportedOperationException. Add a case to handle RowType by retrieving the InternalRow from the array and converting it appropriately, similar to how it's done in FlussRowAsIcebergRecord.createTypeConverter.
public Object get(int index) {
if (flussArray.isNullAt(index)) {
return null;
}
if (elementType instanceof BooleanType) {
return flussArray.getBoolean(index);
} else if (elementType instanceof TinyIntType) {
return (int) flussArray.getByte(index);
} else if (elementType instanceof SmallIntType) {
return (int) flussArray.getShort(index);
} else if (elementType instanceof IntType) {
return flussArray.getInt(index);
} else if (elementType instanceof BigIntType) {
return flussArray.getLong(index);
} else if (elementType instanceof FloatType) {
return flussArray.getFloat(index);
} else if (elementType instanceof DoubleType) {
return flussArray.getDouble(index);
} else if (elementType instanceof StringType) {
return flussArray.getString(index).toString();
} else if (elementType instanceof CharType) {
CharType charType = (CharType) elementType;
return flussArray.getChar(index, charType.getLength()).toString();
} else if (elementType instanceof BytesType || elementType instanceof BinaryType) {
return ByteBuffer.wrap(flussArray.getBytes(index));
} else if (elementType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) elementType;
return flussArray
.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
.toBigDecimal();
} else if (elementType instanceof LocalZonedTimestampType) {
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType;
return toIcebergTimestampLtz(
flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant());
} else if (elementType instanceof TimestampType) {
TimestampType tsType = (TimestampType) elementType;
return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime();
} else if (elementType instanceof DateType) {
return DateTimeUtils.toLocalDate(flussArray.getInt(index));
} else if (elementType instanceof TimeType) {
return DateTimeUtils.toLocalTime(flussArray.getInt(index));
} else if (elementType instanceof ArrayType) {
InternalArray innerArray = flussArray.getArray(index);
return innerArray == null
? null
: new FlussArrayAsIcebergList(
innerArray, ((ArrayType) elementType).getElementType());
} else if (elementType instanceof MapType) {
MapType mapType = (MapType) elementType;
InternalMap internalMap = flussArray.getMap(index);
return new FlussMapAsIcebergMap(
internalMap, mapType.getKeyType(), mapType.getValueType());
} else {
throw new UnsupportedOperationException(
"Unsupported array element type conversion for Fluss type: "
+ elementType.getClass().getSimpleName());
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
Show resolved
Hide resolved
...lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
Show resolved
Hide resolved
ea54108 to
c5156b9
Compare
|
All issues raised by Copilot have been resolved. @luoyuxia |
|
@MehulBatra Could you please help review this pr? |
|
Already in my radar, Will try to review by EOD! @pithecuse527 @luoyuxia |
|
@pithecuse527 can you run |
c5156b9 to
62696c5
Compare
|
@MehulBatra |
62696c5 to
572accb
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
@pithecuse527 Thanks for the pr. I left some comments. PTAL.
Also, please don't forget to add IT in FlinkUnionReadLogTableITCase. Refer to pr #2278
...eberg/src/test/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataTypeMapTest.java
Outdated
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
Outdated
Show resolved
Hide resolved
572accb to
300c831
Compare
|
I've added the integration test for iceberg map in |
luoyuxia
left a comment
There was a problem hiding this comment.
@pithecuse527 Thanks for the pr. LGTM overall. Left minor comments.
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
Outdated
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
Outdated
Show resolved
Hide resolved
739696c to
2de888c
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
@pithecuse527 Thanks for the pr. LGTM!
|
@pithecuse527 But the ci still fails. Also could you please also update the doc for |
2de888c to
8e36fde
Compare
|
sorry, it was because of the IT failure... also, the documentation for iceberg map type mapping has updated. |
luoyuxia
left a comment
There was a problem hiding this comment.
@pithecuse527 Thanks. LGTM. Will merge once ci pass
|
@pithecuse527 I try to push a commit to your branch to fix ci. But permission denied. to |
8e36fde to
59b2230
Compare
Purpose
Linked issue: #2258
Brief change log
Tests
API and Format
Documentation