[spark] support batch read from fluss cluster#2377
Conversation
02d49b2 to
cbd062d
Compare
wuchong
left a comment
There was a problem hiding this comment.
@YannByron thanks, I left some comments.
fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
Outdated
Show resolved
Hide resolved
...ark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
Outdated
Show resolved
Hide resolved
| FlussUpsertInputPartition( | ||
| tableBucket, | ||
| snapshotIdOpt.getAsLong, | ||
| logOffsetOpt.getAsLong |
There was a problem hiding this comment.
Since this is a batch InputPartition, we should add an end offset to make the log split bounded. The latest end offset can be got from OffsetsInitializer.latest().getBucketOffsets(..) method.
We should:
- fetch the latest
kvSnapshots, it is amap<bucket, snapshot_id&log_start_offset>. - fetch the latest offset from
OffsetsInitializer.latest, it is amap<bucket, log_end_offset>. - Join the
kvSnapshotsandOffsetsInitializer.latest, to generate a input partition list for each bucket.
There was a problem hiding this comment.
OK, then we should move OffsetsInitializer-related code to fluss-client first.
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| // Poll for more log records | ||
| val scanRecords: ScanRecords = logScanner.poll(POLL_TIMEOUT) |
There was a problem hiding this comment.
logScanner.poll() is a best-effort API: it may return an empty result due to transient issues (e.g., network glitches) even when unread log records remain on the server. Therefore, we should poll in a loop until we reach the known end offset.
The end offset should be determined at job startup using OffsetsInitializer.latest().getBucketOffsets(...), which gives us the high-watermark for each bucket at the beginning of the batch job.
Since there’s no built-in API to read a bounded log split, we must manually:
- Skip any records with offsets beyond the precomputed end offset, and
- Signal there is no
nextonce all buckets have reached their respective end offsets.
There was a problem hiding this comment.
Please make sure that BatchScanner.pollBatch() is also a best-effort API?
There was a problem hiding this comment.
@YannByron the differnece is that LogScanner is unbounded and BatchScanner is bounded. So when org.apache.fluss.client.table.scanner.log.LogScanner#poll returns an empty ScanRecords, it doesn't mean the source is finished. When the BatchScanner.pollBatch() returns null iterator, it means it reaches to the batch end.
There was a problem hiding this comment.
Understand. Thank you.
| logRecords = bucketRecords.iterator() | ||
| if (logRecords.hasNext) { | ||
| val scanRecord = logRecords.next() | ||
| currentRow = convertToSparkRow(scanRecord) |
There was a problem hiding this comment.
The LogRecord is a changelog that contains -D (delete) and -U (update-before) records. To produce a consistent view, we need to merge these changes with the KV snapshot data in a union-read fashion—just like how we combine data lake snapshots with changelogs.
Fortunately, the KV snapshot scan is already sorted by primary key. We can leverage this by:
- Materializing the delta changes into a temporary
deltatable; - Sorting the
deltatable by primary key usingorg.apache.fluss.row.encode.KeyEncoder#of(...); - Performing a sort-merge between the sorted KV snapshot reader and the sorted delta table reader.
This enables an efficient and correct merge without requiring random lookups or hash-based joins.
There was a problem hiding this comment.
So, I should fetch and keep all the changes between starting offset and stopping offset since the delta changes is no sorted, then sort then, and execute a sort-merge with kv snapshot, right?
And, if there are too many changes, spill is necessary in the future.
There was a problem hiding this comment.
@YannByron you are right. But Flink connector also didn't implement spill logic yet.
Purpose
Linked issue: close #2376
Brief change log
Tests
API and Format
Documentation