diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala index 28fb633b52..f897290c50 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala @@ -50,4 +50,31 @@ object SparkFlussConf { .durationType() .defaultValue(Duration.ofMillis(10000L)) .withDescription("The timeout for log scanner to poll records.") + + val MAX_OFFSETS_PER_TRIGGER: ConfigOption[java.lang.Long] = + ConfigBuilder + .key("scan.max.offsets.per.trigger") + .longType() + .noDefaultValue() + .withDescription( + "Maximum number of offsets processed per trigger interval, " + + "proportionally split across table buckets.") + + val MIN_OFFSETS_PER_TRIGGER: ConfigOption[java.lang.Long] = + ConfigBuilder + .key("scan.min.offsets.per.trigger") + .longType() + .noDefaultValue() + .withDescription( + "Minimum number of offsets per trigger to ensure progress during data spikes. " + + "Requires scan.max.offsets.per.trigger to be set.") + + val MAX_TRIGGER_DELAY: ConfigOption[Duration] = + ConfigBuilder + .key("scan.max.trigger.delay") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription( + "Maximum time a trigger can be delayed waiting for scan.min.offsets.per.trigger " + + "to be satisfied.") } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala index e3d77a9d79..db9f678734 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala @@ -22,6 +22,7 @@ import org.apache.fluss.client.admin.Admin import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} import org.apache.fluss.config.Configuration import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath} +import org.apache.fluss.spark.SparkFlussConf import org.apache.fluss.utils.json.TableBucketOffsets import org.apache.spark.internal.Logging @@ -69,6 +70,40 @@ abstract class FlussMicroBatchStream( val stoppingOffsetsInitializer: OffsetsInitializer = FlussOffsetInitializers.stoppingOffsetsInitializer(false, options, flussConfig) + private val maxOffsetsPerTrigger: Option[Long] = { + val optVal = Option(options.get(SparkFlussConf.MAX_OFFSETS_PER_TRIGGER.key())) + .map(_.toLong) + optVal.orElse(Option(flussConfig.get(SparkFlussConf.MAX_OFFSETS_PER_TRIGGER)).map(_.toLong)) + } + + private val minOffsetsPerTrigger: Option[Long] = { + val optVal = Option(options.get(SparkFlussConf.MIN_OFFSETS_PER_TRIGGER.key())) + .map(_.toLong) + optVal.orElse(Option(flussConfig.get(SparkFlussConf.MIN_OFFSETS_PER_TRIGGER)).map(_.toLong)) + } + + private val maxTriggerDelayMs: Long = { + Option(options.get(SparkFlussConf.MAX_TRIGGER_DELAY.key())) + .map(_.toLong) // accept milliseconds from DataFrameReader options + .getOrElse(flussConfig.get(SparkFlussConf.MAX_TRIGGER_DELAY).toMillis) + } + + // Validate that minOffsetsPerTrigger is not used without maxOffsetsPerTrigger + if (minOffsetsPerTrigger.isDefined && maxOffsetsPerTrigger.isEmpty) { + throw new IllegalArgumentException( + "minOffsetsPerTrigger requires maxOffsetsPerTrigger to be set") + } + + // Validate that minOffsetsPerTrigger <= maxOffsetsPerTrigger + if ( + minOffsetsPerTrigger.isDefined && maxOffsetsPerTrigger.isDefined && + minOffsetsPerTrigger.get > maxOffsetsPerTrigger.get + ) { + throw new IllegalArgumentException( + s"minOffsetsPerTrigger (${minOffsetsPerTrigger.get}) must not be greater than " + + s"maxOffsetsPerTrigger (${maxOffsetsPerTrigger.get})") + } + protected def projection: Array[Int] = { val columnNameToIndex = tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap readSchema.fields.map { @@ -94,7 +129,15 @@ abstract class FlussMicroBatchStream( } override def getDefaultReadLimit: ReadLimit = { - ReadLimit.allAvailable() + (minOffsetsPerTrigger, maxOffsetsPerTrigger) match { + case (Some(minOffsets), Some(maxOffsets)) => + ReadLimit.compositeLimit( + Array(ReadLimit.minRows(minOffsets, maxTriggerDelayMs), ReadLimit.maxRows(maxOffsets))) + case (_, Some(maxOffsets)) => + ReadLimit.maxRows(maxOffsets) + case _ => + ReadLimit.allAvailable() + } } override def initialOffset(): Offset = { @@ -103,16 +146,97 @@ abstract class FlussMicroBatchStream( } override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { - if (!readLimit.isInstanceOf[ReadAllAvailable]) { - throw new UnsupportedOperationException(s"Only ReadAllAvailable is supported, but $readLimit") - } - val latestTableBucketOffsets = if (allDataForTriggerAvailableNow.isDefined) { allDataForTriggerAvailableNow.get } else { fetchLatestOffsets().get } - FlussSourceOffset(latestTableBucketOffsets) + + // Use the more restrictive of readLimit and maxOffsetsPerTrigger + val limitFromReadLimit = extractMaxOffsets(readLimit) + val effectiveLimit = (limitFromReadLimit, maxOffsetsPerTrigger) match { + case (Some(a), Some(b)) => Some(Math.min(a, b)) + case (a @ Some(_), None) => a + case (None, b @ Some(_)) => b + case _ => None + } + + val cappedOffsets = effectiveLimit match { + case Some(limit) if start != null => + val startOffsets = start.asInstanceOf[FlussSourceOffset].tableBucketOffsets + capOffsets(startOffsets, latestTableBucketOffsets, limit) + case _ => + latestTableBucketOffsets + } + FlussSourceOffset(cappedOffsets) + } + + /** + * Extracts the maximum number of offsets from the given ReadLimit. Returns None if all available + * data should be read (no capping). + */ + private def extractMaxOffsets(readLimit: ReadLimit): Option[Long] = { + readLimit match { + case _: ReadAllAvailable => None + case maxRows: ReadMaxRows => Some(maxRows.maxRows()) + case composite: CompositeReadLimit => + composite.getReadLimits + .collectFirst { case maxRows: ReadMaxRows => maxRows.maxRows() } + case _ => + throw new UnsupportedOperationException( + s"Unsupported ReadLimit type: ${readLimit.getClass.getSimpleName}") + } + } + + /** + * Caps the latest offsets so that the total number of offsets (records) across all buckets does + * not exceed `maxOffsets`. The limit is distributed proportionally across buckets based on their + * available data. + */ + private def capOffsets( + startOffsets: TableBucketOffsets, + latestOffsets: TableBucketOffsets, + maxOffsets: Long): TableBucketOffsets = { + val bucketAvailableMap = latestOffsets.getOffsets.asScala.map { + case (bucket, latestOffset) => + val startOffset = Option(startOffsets.getOffsets.get(bucket)) + .map(_.toLong) + .getOrElse(0L) + bucket -> Math.max(0L, latestOffset.toLong - startOffset) + } + + val totalAvailable = bucketAvailableMap.values.sum + + if (totalAvailable <= maxOffsets) { + // All available data fits within the limit, no capping needed + latestOffsets + } else { + // Proportionally distribute maxOffsets across buckets + var remainingOffsets = maxOffsets + val sortedBuckets = bucketAvailableMap.toSeq.sortBy(_._2) + + val allocations = scala.collection.mutable.Map[TableBucket, Long]() + var bucketsLeft = sortedBuckets.size + + for ((bucket, available) <- sortedBuckets) { + // Fair share for this bucket + val fairShare = remainingOffsets / bucketsLeft + val allocated = Math.min(available, fairShare) + allocations(bucket) = allocated + remainingOffsets -= allocated + bucketsLeft -= 1 + } + + val cappedOffsets = latestOffsets.getOffsets.asScala.map { + case (bucket, _) => + val startOffset = Option(startOffsets.getOffsets.get(bucket)) + .map(_.toLong) + .getOrElse(0L) + val allocated = allocations.getOrElse(bucket, 0L) + bucket -> java.lang.Long.valueOf(startOffset + allocated) + } + new TableBucketOffsets(latestOffsets.getTableId, cappedOffsets.asJava) + } } override def prepareForTriggerAvailableNow(): Unit = { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala index 6921952c7f..fe1f7ce9c6 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala @@ -19,6 +19,7 @@ package org.apache.fluss.spark import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} import org.apache.fluss.client.table.Table +import org.apache.fluss.spark.SparkFlussConf import org.apache.fluss.spark.read.{FlussMicroBatchStream, FlussSourceOffset} import org.apache.fluss.spark.write.{FlussAppendDataWriter, FlussUpsertDataWriter} import org.apache.fluss.utils.json.TableBucketOffsets @@ -336,6 +337,58 @@ class SparkStreamingTest extends FlussSparkTestBase with StreamTest { } } + test("read: log table with maxOffsetsPerTrigger rate limit") { + val tableName = "t" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (id int, data string) TBLPROPERTIES('bucket.num' = '1')") + + // Pre-load data BEFORE starting the stream and use "earliest" startup mode so the stream reads from the beginning. + // This avoids using AddFlussData mid-stream. + sql( + s"INSERT INTO $tableName VALUES (1, 'data1'), (2, 'data2'), (3, 'data3'), (4, 'data4'), (5, 'data5')") + + val clock = new StreamManualClock + testStream( + spark.readStream + .options( + Map( + "scan.startup.mode" -> "earliest", + SparkFlussConf.MAX_OFFSETS_PER_TRIGGER.key() -> "2" + )) + .table(tableName))( + StartStream(trigger = Trigger.ProcessingTime(500), clock), + + // First trigger should only get 2 records due to rate limit + AdvanceManualClock(500), + waitUntilBatchProcessed(clock), + CheckNewAnswer(Row(1, "data1"), Row(2, "data2")), + + // Second trigger should get next 2 records + AdvanceManualClock(500), + waitUntilBatchProcessed(clock), + CheckNewAnswer(Row(3, "data3"), Row(4, "data4")), + + // Third trigger should get the remaining 1 record + AdvanceManualClock(500), + waitUntilBatchProcessed(clock), + CheckNewAnswer(Row(5, "data5")) + ) + } + } + + private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { + q => + eventually(timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + private def writeToLogTable(table: Table, schema: StructType, dataArr: Seq[Row]): Unit = { val writer = if (table.getTableInfo.hasPrimaryKey) { FlussUpsertDataWriter(table.getTableInfo.getTablePath, schema, conn.getConfiguration)