Skip to content

[AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for basic types#2079

Merged
Tartarus0zm merged 2 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part2a
Mar 10, 2026
Merged

[AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for basic types#2079
Tartarus0zm merged 2 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part2a

Conversation

@x-tong
Copy link
Contributor

@x-tong x-tong commented Mar 9, 2026

Which issue does this PR close?

Partially addresses #1850 (Part 2a of the Flink RowData to Arrow conversion).

Rationale for this change

Per AIP-1, the Flink integration data path requires converting Flink RowData into Arrow VectorSchemaRoot for export to the native engine (DataFusion/Rust). This PR implements the writer layer for basic types, following Flink's official flink-python Arrow implementation as requested during Part 1 review (#1959).

What changes are included in this PR?

Commit 1: ArrowFieldWriter base class + 12 type writers (16 files, +2181 lines)

  • ArrowFieldWriter<IN> — Generic abstract base class using template method pattern (write()doWrite() + count++), aligned with Flink's flink-python ArrowFieldWriter.
  • 12 concrete writers in writers/ sub-package, each with forRow()/forArray() dual-mode factory methods:
    • Numeric: IntWriter, TinyIntWriter, SmallIntWriter, BigIntWriter, FloatWriter, DoubleWriter
    • Non-numeric: BooleanWriter, VarCharWriter, VarBinaryWriter, DecimalWriter, DateWriter, NullWriter
  • Key design: Each writer (except NullWriter) has two public static final inner classes (XxxWriterForRow / XxxWriterForArray) because Flink's RowData and ArrayData have no common getter interface.
  • Special cases:
    • NullWriter: No inner classes needed, doWrite() is empty (NullVector values are inherently null)
    • DecimalWriter: Takes precision/scale parameters, includes fitBigDecimal() validation before writing (aligned with Flink's fromBigDecimal logic)
  • Unit tests: IntWriterTest (5), BasicWritersTest (20), NonNumericWritersTest (12) — 37 tests

Commit 2: FlinkArrowWriter orchestrator + factory methods (3 files, +482 lines)

  • FlinkArrowWriter — Orchestrates per-column ArrowFieldWriter<RowData>[] to write Flink RowData into Arrow VectorSchemaRoot. Lifecycle: create()write(row)*finish()reset().
  • Factory methods in FlinkArrowUtilscreateArrowFieldWriterForRow()/createArrowFieldWriterForArray() dispatch writer creation based on Arrow vector type (instanceof chain). Both are package-private.
  • Integration tests: FlinkArrowWriterTest (7) — all-types write, null handling, multi-row batches, reset, empty batch, zero columns, unsupported type. Total: 53 tests, all passing.

Scope

This PR covers basic types only. Time, Timestamp, and complex types (Array/Map/Row) will be in Part 2b.

Are there any user-facing changes?

No. Internal API for Flink integration.

How was this patch tested?

53 tests across 4 test classes:

./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
  -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative

Result: 53 pass, 0 failures.

x-tong added 2 commits March 5, 2026 23:01
…for Flink Arrow conversion

Introduce the generic ArrowFieldWriter<T> base class and 12 concrete
type writers that convert Flink RowData/ArrayData fields to Arrow vectors.

The design follows Flink's official flink-python module (ArrowFieldWriter +
forRow/forArray dual-mode pattern), as requested during Part 1 review.

Writers added:
- Numeric: Int, TinyInt, SmallInt, BigInt, Float, Double
- Non-numeric: Boolean, VarChar, VarBinary, Decimal, Date, Null

Each writer (except NullWriter) has two inner classes for RowData and
ArrayData access, instantiated via static forRow()/forArray() factory
methods.

Includes unit tests: IntWriterTest (5), BasicWritersTest (20),
NonNumericWritersTest (12) — 37 tests total.
…hods

FlinkArrowWriter orchestrates per-column ArrowFieldWriters to write
Flink RowData into an Arrow VectorSchemaRoot. Lifecycle: create() ->
write(row)* -> finish() -> reset().

Add createArrowFieldWriterForRow/ForArray factory methods in
FlinkArrowUtils that dispatch writer creation based on Arrow vector
type (instanceof chain). Both are package-private as they are only
used by FlinkArrowWriter.create().

Includes integration tests (7): all-types write, null handling,
multi-row batches, reset, empty batch, zero columns, unsupported type.

Total: 53 tests across 4 test classes, all passing.
@github-actions github-actions bot added the flink label Mar 9, 2026
@Tartarus0zm Tartarus0zm self-requested a review March 10, 2026 02:34
Copy link
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your contribute!
LGTM

@Tartarus0zm Tartarus0zm merged commit 64de43f into apache:master Mar 10, 2026
117 checks passed
Tartarus0zm pushed a commit that referenced this pull request Mar 17, 2026
…2086)

# Which issue does this PR close?

Partially addresses #1850 (Part 2b of the Flink RowData to Arrow
conversion).

# Rationale for this change

Part 2a (#2079) implemented `ArrowFieldWriter` base class, 12 basic type
writers, and `FlinkArrowWriter` orchestrator. This PR completes the
remaining 5 writer types (Time, Timestamp, Array, Map, Row), enabling
full coverage of all Flink logical types supported by the Arrow type
mapping introduced in Part 1 (#1959).

The implementation follows Flink's official `flink-python` Arrow module
as established in Part 2a, with the same `forRow()`/`forArray()`
dual-mode factory pattern and template method design.

# What changes are included in this PR?

## Commit 1: 5 ArrowFieldWriters + unit tests (10 files, +1509 lines)

- **`TimeWriter`** — Handles all 4 Arrow time precisions
(`TimeSecVector`, `TimeMilliVector`, `TimeMicroVector`,
`TimeNanoVector`) via instanceof dispatch. Flink stores TIME as int
(milliseconds), converted to each precision with `L`-suffixed literals
to avoid int overflow.
- **`TimestampWriter`** — Handles all 4 Arrow timestamp precisions.
Combines `TimestampData.getMillisecond()` (long) and
`getNanoOfMillisecond()` (int) for sub-millisecond precision.
Constructor validates `timezone == null` via `Preconditions.checkState`,
matching Flink official — timezone is not handled at the writer layer.
- **`ArrayWriter`** — Delegates to an `elementWriter`
(`ArrowFieldWriter<ArrayData>`) for each array element. Overrides
`finish()`/`reset()` to propagate to the element writer.
- **`MapWriter`** — Arrow maps are `List<Struct{key, value}>`. Holds
separate key and value writers operating on `ArrayData`. Sets
`structVector.setIndexDefined()` for each entry. Overrides
`finish()`/`reset()` to propagate to key/value writers.
- **`RowWriter`** — Nested struct handling with
`ArrowFieldWriter<RowData>[]` for child fields. Caches a `nullRow`
(`GenericRowData`) in the constructor for null struct handling (avoids
per-call allocation). Uses a single child-write loop for both null and
non-null paths, matching Flink official.
- **Unit tests**: `TimeWriterTest` (8), `TimestampWriterTest` (9),
`ArrayWriterTest` (5), `MapWriterTest` (3), `RowWriterTest` (3) — 28
tests covering all precisions, null handling, reset/multi-batch, edge
cases (pre-epoch timestamps, empty arrays/maps).

## Commit 2: Factory method extension + integration test (2 files, +158
lines)

- **`FlinkArrowUtils`** — Extended `createArrowFieldWriterForRow()` and
`createArrowFieldWriterForArray()` with branches for `TimeWriter`,
`TimestampWriter`, `ArrayWriter`, `MapWriter`, `RowWriter`. MapVector
check is placed before ListVector (since `MapVector extends
ListVector`). Timestamp branch extracts precision from both
`TimestampType` and `LocalZonedTimestampType`.
- **`FlinkArrowWriterTest`** — Added `testWriteTemporalAndComplexTypes`
integration test covering TIME(6), TIMESTAMP(6), TIMESTAMP_LTZ(3),
ARRAY\<INT\>, MAP\<VARCHAR, INT\>, ROW\<nested_id INT\>. Updated
`testUnsupportedTypeThrows` to use `MultisetType` (since `ArrayType` is
now supported).

# Scope

This PR completes all Flink-to-Arrow writer types. The remaining work
for #1850 is the reverse direction (Arrow-to-Flink reader), which is
tracked separately.

# Are there any user-facing changes?

No. Internal API for Flink integration.

# How was this patch tested?

36 tests across 6 test classes (28 new + 8 existing):

```bash
./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
  -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
```

Result: 36 pass, 0 failures.
@Tartarus0zm Tartarus0zm linked an issue Mar 17, 2026 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce Flink RowData to Arrow

2 participants