Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,31 @@ object SparkFlussConf {
.durationType()
.defaultValue(Duration.ofMillis(10000L))
.withDescription("The timeout for log scanner to poll records.")

val MAX_OFFSETS_PER_TRIGGER: ConfigOption[java.lang.Long] =
ConfigBuilder
.key("scan.max.offsets.per.trigger")
.longType()
.noDefaultValue()
.withDescription(
"Maximum number of offsets processed per trigger interval, " +
"proportionally split across table buckets.")

val MIN_OFFSETS_PER_TRIGGER: ConfigOption[java.lang.Long] =
ConfigBuilder
.key("scan.min.offsets.per.trigger")
.longType()
.noDefaultValue()
.withDescription(
"Minimum number of offsets per trigger to ensure progress during data spikes. " +
"Requires scan.max.offsets.per.trigger to be set.")

val MAX_TRIGGER_DELAY: ConfigOption[Duration] =
ConfigBuilder
.key("scan.max.trigger.delay")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Maximum time a trigger can be delayed waiting for scan.min.offsets.per.trigger " +
"to be satisfied.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer}
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath}
import org.apache.fluss.spark.SparkFlussConf
import org.apache.fluss.utils.json.TableBucketOffsets

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -69,6 +70,40 @@ abstract class FlussMicroBatchStream(
val stoppingOffsetsInitializer: OffsetsInitializer =
FlussOffsetInitializers.stoppingOffsetsInitializer(false, options, flussConfig)

private val maxOffsetsPerTrigger: Option[Long] = {
val optVal = Option(options.get(SparkFlussConf.MAX_OFFSETS_PER_TRIGGER.key()))
.map(_.toLong)
optVal.orElse(Option(flussConfig.get(SparkFlussConf.MAX_OFFSETS_PER_TRIGGER)).map(_.toLong))
}

private val minOffsetsPerTrigger: Option[Long] = {
val optVal = Option(options.get(SparkFlussConf.MIN_OFFSETS_PER_TRIGGER.key()))
.map(_.toLong)
optVal.orElse(Option(flussConfig.get(SparkFlussConf.MIN_OFFSETS_PER_TRIGGER)).map(_.toLong))
}

private val maxTriggerDelayMs: Long = {
Option(options.get(SparkFlussConf.MAX_TRIGGER_DELAY.key()))
.map(_.toLong) // accept milliseconds from DataFrameReader options
.getOrElse(flussConfig.get(SparkFlussConf.MAX_TRIGGER_DELAY).toMillis)
}

// Validate that minOffsetsPerTrigger is not used without maxOffsetsPerTrigger
if (minOffsetsPerTrigger.isDefined && maxOffsetsPerTrigger.isEmpty) {
throw new IllegalArgumentException(
"minOffsetsPerTrigger requires maxOffsetsPerTrigger to be set")
}

// Validate that minOffsetsPerTrigger <= maxOffsetsPerTrigger
if (
minOffsetsPerTrigger.isDefined && maxOffsetsPerTrigger.isDefined &&
minOffsetsPerTrigger.get > maxOffsetsPerTrigger.get
) {
throw new IllegalArgumentException(
s"minOffsetsPerTrigger (${minOffsetsPerTrigger.get}) must not be greater than " +
s"maxOffsetsPerTrigger (${maxOffsetsPerTrigger.get})")
}

protected def projection: Array[Int] = {
val columnNameToIndex = tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
readSchema.fields.map {
Expand All @@ -94,7 +129,15 @@ abstract class FlussMicroBatchStream(
}

override def getDefaultReadLimit: ReadLimit = {
ReadLimit.allAvailable()
(minOffsetsPerTrigger, maxOffsetsPerTrigger) match {
case (Some(minOffsets), Some(maxOffsets)) =>
ReadLimit.compositeLimit(
Array(ReadLimit.minRows(minOffsets, maxTriggerDelayMs), ReadLimit.maxRows(maxOffsets)))
case (_, Some(maxOffsets)) =>
ReadLimit.maxRows(maxOffsets)
case _ =>
ReadLimit.allAvailable()
}
}

override def initialOffset(): Offset = {
Expand All @@ -103,16 +146,97 @@ abstract class FlussMicroBatchStream(
}

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
if (!readLimit.isInstanceOf[ReadAllAvailable]) {
throw new UnsupportedOperationException(s"Only ReadAllAvailable is supported, but $readLimit")
}

val latestTableBucketOffsets = if (allDataForTriggerAvailableNow.isDefined) {
allDataForTriggerAvailableNow.get
} else {
fetchLatestOffsets().get
}
FlussSourceOffset(latestTableBucketOffsets)

// Use the more restrictive of readLimit and maxOffsetsPerTrigger
val limitFromReadLimit = extractMaxOffsets(readLimit)
val effectiveLimit = (limitFromReadLimit, maxOffsetsPerTrigger) match {
case (Some(a), Some(b)) => Some(Math.min(a, b))
case (a @ Some(_), None) => a
case (None, b @ Some(_)) => b
case _ => None
}

val cappedOffsets = effectiveLimit match {
case Some(limit) if start != null =>
val startOffsets = start.asInstanceOf[FlussSourceOffset].tableBucketOffsets
capOffsets(startOffsets, latestTableBucketOffsets, limit)
case _ =>
latestTableBucketOffsets
}
FlussSourceOffset(cappedOffsets)
}

/**
* Extracts the maximum number of offsets from the given ReadLimit. Returns None if all available
* data should be read (no capping).
*/
private def extractMaxOffsets(readLimit: ReadLimit): Option[Long] = {
readLimit match {
case _: ReadAllAvailable => None
case maxRows: ReadMaxRows => Some(maxRows.maxRows())
case composite: CompositeReadLimit =>
composite.getReadLimits
.collectFirst { case maxRows: ReadMaxRows => maxRows.maxRows() }
case _ =>
throw new UnsupportedOperationException(
s"Unsupported ReadLimit type: ${readLimit.getClass.getSimpleName}")
}
}

/**
* Caps the latest offsets so that the total number of offsets (records) across all buckets does
* not exceed `maxOffsets`. The limit is distributed proportionally across buckets based on their
* available data.
*/
private def capOffsets(
startOffsets: TableBucketOffsets,
latestOffsets: TableBucketOffsets,
maxOffsets: Long): TableBucketOffsets = {
val bucketAvailableMap = latestOffsets.getOffsets.asScala.map {
case (bucket, latestOffset) =>
val startOffset = Option(startOffsets.getOffsets.get(bucket))
.map(_.toLong)
.getOrElse(0L)
bucket -> Math.max(0L, latestOffset.toLong - startOffset)
}

val totalAvailable = bucketAvailableMap.values.sum

if (totalAvailable <= maxOffsets) {
// All available data fits within the limit, no capping needed
latestOffsets
} else {
// Proportionally distribute maxOffsets across buckets
var remainingOffsets = maxOffsets
val sortedBuckets = bucketAvailableMap.toSeq.sortBy(_._2)

val allocations = scala.collection.mutable.Map[TableBucket, Long]()
var bucketsLeft = sortedBuckets.size

for ((bucket, available) <- sortedBuckets) {
// Fair share for this bucket
val fairShare = remainingOffsets / bucketsLeft
val allocated = Math.min(available, fairShare)
allocations(bucket) = allocated
remainingOffsets -= allocated
bucketsLeft -= 1
}

val cappedOffsets = latestOffsets.getOffsets.asScala.map {
case (bucket, _) =>
val startOffset = Option(startOffsets.getOffsets.get(bucket))
.map(_.toLong)
.getOrElse(0L)
val allocated = allocations.getOrElse(bucket, 0L)
bucket -> java.lang.Long.valueOf(startOffset + allocated)
}
new TableBucketOffsets(latestOffsets.getTableId, cappedOffsets.asJava)
}
}

override def prepareForTriggerAvailableNow(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.fluss.spark

import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer}
import org.apache.fluss.client.table.Table
import org.apache.fluss.spark.SparkFlussConf
import org.apache.fluss.spark.read.{FlussMicroBatchStream, FlussSourceOffset}
import org.apache.fluss.spark.write.{FlussAppendDataWriter, FlussUpsertDataWriter}
import org.apache.fluss.utils.json.TableBucketOffsets
Expand Down Expand Up @@ -336,6 +337,58 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
}
}

test("read: log table with maxOffsetsPerTrigger rate limit") {
val tableName = "t"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (id int, data string) TBLPROPERTIES('bucket.num' = '1')")

// Pre-load data BEFORE starting the stream and use "earliest" startup mode so the stream reads from the beginning.
// This avoids using AddFlussData mid-stream.
sql(
s"INSERT INTO $tableName VALUES (1, 'data1'), (2, 'data2'), (3, 'data3'), (4, 'data4'), (5, 'data5')")

val clock = new StreamManualClock
testStream(
spark.readStream
.options(
Map(
"scan.startup.mode" -> "earliest",
SparkFlussConf.MAX_OFFSETS_PER_TRIGGER.key() -> "2"
))
.table(tableName))(
StartStream(trigger = Trigger.ProcessingTime(500), clock),

// First trigger should only get 2 records due to rate limit
AdvanceManualClock(500),
waitUntilBatchProcessed(clock),
CheckNewAnswer(Row(1, "data1"), Row(2, "data2")),

// Second trigger should get next 2 records
AdvanceManualClock(500),
waitUntilBatchProcessed(clock),
CheckNewAnswer(Row(3, "data3"), Row(4, "data4")),

// Third trigger should get the remaining 1 record
AdvanceManualClock(500),
waitUntilBatchProcessed(clock),
CheckNewAnswer(Row(5, "data5"))
)
}
}

private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery {
q =>
eventually(timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}

private def writeToLogTable(table: Table, schema: StructType, dataArr: Seq[Row]): Unit = {
val writer = if (table.getTableInfo.hasPrimaryKey) {
FlussUpsertDataWriter(table.getTableInfo.getTablePath, schema, conn.getConfiguration)
Expand Down