From 322763797f43861b054de7d12ca2058261cc695a Mon Sep 17 00:00:00 2001 From: Tartarus0zm Date: Fri, 13 Mar 2026 21:51:01 +0800 Subject: [PATCH] [AURON #2062] AuronKafkaSourceFunction support generating watermarks --- .../auron-flink-runtime/pom.xml | 10 + .../kafka/AuronKafkaDynamicTableFactory.java | 4 +- .../kafka/AuronKafkaDynamicTableSource.java | 28 ++- .../kafka/AuronKafkaSourceFunction.java | 232 ++++++++++++++++-- .../KafkaTopicPartitionAssigner.java | 60 +++++ .../SourceContextWatermarkOutputAdapter.java | 48 ++++ .../src/flink/kafka_scan_exec.rs | 43 ++-- 7 files changed, 361 insertions(+), 64 deletions(-) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml index e3695bcd5..568f25eec 100644 --- a/auron-flink-extension/auron-flink-runtime/pom.xml +++ b/auron-flink-extension/auron-flink-runtime/pom.xml @@ -26,6 +26,9 @@ auron-flink-runtime Apache Auron Flink Runtime ${flink.version} Apache Auron Flink Project + + 3.4.0 + @@ -72,6 +75,13 @@ provided + + + org.apache.kafka + kafka-clients + ${kafka.version} + + org.apache.auron diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java index 177a7c39f..319adb52b 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java @@ -92,8 +92,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); try { - String kafkaPropertiesJson = mapper.writeValueAsString( - getKafkaProperties(context.getCatalogTable().getOptions())); Map formatConfig = new HashMap<>(); String format = tableOptions.getOptional(FactoryUtil.FORMAT).get(); formatConfig.put(KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD, tableOptions.get(NESTED_COLS_FIELD_MAPPING)); @@ -105,7 +103,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { return new AuronKafkaDynamicTableSource( context.getCatalogTable().getSchema().toPhysicalRowDataType(), tableOptions.get(TOPIC), - kafkaPropertiesJson, + getKafkaProperties(context.getCatalogTable().getOptions()), format, formatConfig, tableOptions.get(BUFFER_SIZE), diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java index 5c7be0057..837ea5817 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java @@ -17,7 +17,10 @@ package org.apache.auron.flink.connector.kafka; import java.util.Map; +import java.util.Properties; import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.ChangelogMode; @@ -25,6 +28,7 @@ import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -34,20 +38,22 @@ /** * A {@link DynamicTableSource} for Auron Kafka. */ -public class AuronKafkaDynamicTableSource implements ScanTableSource { +public class AuronKafkaDynamicTableSource implements ScanTableSource, SupportsWatermarkPushDown { private final DataType physicalDataType; private final String kafkaTopic; - private final String kafkaPropertiesJson; + private final Properties kafkaProperties; private final String format; private final Map formatConfig; private final int bufferSize; private final String startupMode; + /** Watermark strategy that is used to generate per-partition watermark. */ + protected @Nullable WatermarkStrategy watermarkStrategy; public AuronKafkaDynamicTableSource( DataType physicalDataType, String kafkaTopic, - String kafkaPropertiesJson, + Properties kafkaProperties, String format, Map formatConfig, int bufferSize, @@ -56,7 +62,7 @@ public AuronKafkaDynamicTableSource( Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); this.physicalDataType = physicalDataType; this.kafkaTopic = kafkaTopic; - this.kafkaPropertiesJson = kafkaPropertiesJson; + this.kafkaProperties = kafkaProperties; this.format = format; this.formatConfig = formatConfig; this.bufferSize = bufferSize; @@ -75,11 +81,16 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { physicalDataType.getLogicalType(), auronOperatorId, kafkaTopic, - kafkaPropertiesJson, + kafkaProperties, format, formatConfig, bufferSize, startupMode); + + if (watermarkStrategy != null) { + sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy); + } + return new DataStreamScanProvider() { @Override @@ -98,11 +109,16 @@ public boolean isBounded() { @Override public DynamicTableSource copy() { return new AuronKafkaDynamicTableSource( - physicalDataType, kafkaTopic, kafkaPropertiesJson, format, formatConfig, bufferSize, startupMode); + physicalDataType, kafkaTopic, kafkaProperties, format, formatConfig, bufferSize, startupMode); } @Override public String asSummaryString() { return "Auron Kafka Dynamic Table Source"; } + + @Override + public void applyWatermark(WatermarkStrategy watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + } } diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index 569e63a4f..52c56fa28 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -17,6 +17,7 @@ package org.apache.auron.flink.connector.kafka; import static org.apache.auron.flink.connector.kafka.KafkaConstants.*; +import static org.apache.flink.util.Preconditions.checkNotNull; import java.io.File; import java.io.InputStream; @@ -37,14 +38,22 @@ import org.apache.auron.protobuf.PhysicalPlanNode; import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -52,12 +61,18 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner; +import org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.SerializableObject; +import org.apache.flink.util.SerializedValue; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +81,10 @@ * Only support AT-LEAST ONCE semantics. * If checkpoints are enabled, Kafka offsets are committed via Auron after a successful checkpoint. * If checkpoints are disabled, Kafka offsets are committed periodically via Auron. + * + *

Watermark support is implemented via {@link WatermarkOutputMultiplexer} with per-partition + * watermark generation. Partition expansion is detected periodically using a lightweight + * {@link KafkaConsumer} (metadata queries only, no data consumption). */ public class AuronKafkaSourceFunction extends RichParallelSourceFunction implements FlinkAuronFunction, CheckpointListener, CheckpointedFunction { @@ -73,12 +92,13 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction formatConfig; private final int bufferSize; private final String startupMode; private transient PhysicalPlanNode physicalPlanNode; + // Flink Checkpoint-related, compatible with Flink Kafka Legacy source /** State name of the consumer's partition offset states. */ private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; @@ -90,17 +110,30 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction restoredOffsets; private transient Map currentOffsets; private final SerializableObject lock = new SerializableObject(); - + private SerializedValue> watermarkStrategy; private volatile boolean isRunning; private transient String auronOperatorIdWithSubtaskIndex; private transient MetricNode nativeMetric; private transient ObjectMapper mapper; + // Kafka Consumer for partition metadata discovery only (does NOT consume data) + private transient KafkaConsumer kafkaConsumer; + private transient List assignedPartitions; + + // Watermark related + private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer; + private transient Map partitionIdToOutputIdMap; + private transient WatermarkGenerator watermarkGenerator; + private transient TimestampAssigner timestampAssigner; + // Periodic watermark control: autoWatermarkInterval > 0 means enabled + private transient long autoWatermarkInterval; + private transient long lastPeriodicWatermarkTime; + public AuronKafkaSourceFunction( LogicalType outputType, String auronOperatorId, String topic, - String kafkaPropertiesJson, + Properties kafkaProperties, String format, Map formatConfig, int bufferSize, @@ -108,7 +141,7 @@ public AuronKafkaSourceFunction( this.outputType = outputType; this.auronOperatorId = auronOperatorId; this.topic = topic; - this.kafkaPropertiesJson = kafkaPropertiesJson; + this.kafkaProperties = kafkaProperties; this.format = format; this.formatConfig = formatConfig; this.bufferSize = bufferSize; @@ -118,12 +151,12 @@ public AuronKafkaSourceFunction( @Override public void open(Configuration config) throws Exception { // init auron plan + mapper = new ObjectMapper(); PhysicalPlanNode.Builder sourcePlan = PhysicalPlanNode.newBuilder(); KafkaScanExecNode.Builder scanExecNode = KafkaScanExecNode.newBuilder(); scanExecNode.setKafkaTopic(this.topic); - scanExecNode.setKafkaPropertiesJson(this.kafkaPropertiesJson); + scanExecNode.setKafkaPropertiesJson(mapper.writeValueAsString(kafkaProperties)); scanExecNode.setDataFormat(KafkaFormat.valueOf(this.format.toUpperCase(Locale.ROOT))); - mapper = new ObjectMapper(); scanExecNode.setFormatConfigJson(mapper.writeValueAsString(formatConfig)); scanExecNode.setBatchSize(this.bufferSize); if (this.format.equalsIgnoreCase(KafkaConstants.KAFKA_FORMAT_PROTOBUF)) { @@ -153,21 +186,68 @@ public void open(Configuration config) throws Exception { sourcePlan.setKafkaScan(scanExecNode.build()); this.physicalPlanNode = sourcePlan.build(); + // 1. Initialize Kafka Consumer for partition metadata discovery only (not for data consumption) + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaProperties); + // Override to ensure this consumer does not interfere with actual data consumption + kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-auron-fetch-meta"); + kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + kafkaProps.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + kafkaProps.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + this.kafkaConsumer = new KafkaConsumer<>(kafkaProps); + StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); + // 2. Discover and assign partitions for this subtask + List partitionInfos = kafkaConsumer.partitionsFor(topic); + int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); + int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + + this.assignedPartitions = new ArrayList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + int partitionId = partitionInfo.partition(); + if (KafkaTopicPartitionAssigner.assign(topic, partitionId, numSubtasks) == subtaskIndex) { + assignedPartitions.add(partitionId); + } + } boolean enableCheckpoint = runtimeContext.isCheckpointingEnabled(); Map auronRuntimeInfo = new HashMap<>(); - auronRuntimeInfo.put("subtask_index", runtimeContext.getIndexOfThisSubtask()); - auronRuntimeInfo.put("num_readers", runtimeContext.getNumberOfParallelSubtasks()); + auronRuntimeInfo.put("subtask_index", subtaskIndex); + auronRuntimeInfo.put("num_readers", numSubtasks); auronRuntimeInfo.put("enable_checkpoint", enableCheckpoint); auronRuntimeInfo.put("restored_offsets", restoredOffsets); + auronRuntimeInfo.put("assigned_partitions", assignedPartitions); JniBridge.putResource(auronOperatorIdWithSubtaskIndex, mapper.writeValueAsString(auronRuntimeInfo)); - this.isRunning = true; - LOG.info( - "Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}", - auronOperatorIdWithSubtaskIndex, - enableCheckpoint); currentOffsets = new HashMap<>(); pendingOffsetsToCommit = new LinkedMap(); + LOG.info( + "Auron kafka source init successful, Auron operator id: {}, enableCheckpoint is {}, " + + "subtask {} assigned partitions: {}", + auronOperatorIdWithSubtaskIndex, + enableCheckpoint, + subtaskIndex, + assignedPartitions); + + // 3. Initialize Watermark components if watermarkStrategy is set + if (watermarkStrategy != null) { + ClassLoader userCodeClassLoader = runtimeContext.getUserCodeClassLoader(); + WatermarkStrategy deserializedWatermarkStrategy = + watermarkStrategy.deserializeValue(userCodeClassLoader); + + MetricGroup metricGroup = runtimeContext.getMetricGroup(); + + this.timestampAssigner = deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup); + + this.watermarkGenerator = deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup); + + // 4. Determine periodic watermark interval + // autoWatermarkInterval > 0 means periodic watermark is enabled + this.autoWatermarkInterval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); + this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first emit triggers immediately + } this.isRunning = true; } @@ -186,6 +266,22 @@ public void add(String name, long value) { fieldList.add(new RowType.RowField(KAFKA_AURON_META_TIMESTAMP, new BigIntType(false))); fieldList.addAll(((RowType) outputType).getFields()); RowType auronOutputRowType = new RowType(fieldList); + + // Initialize WatermarkOutputMultiplexer here because sourceContext is available + if (watermarkGenerator != null) { + this.watermarkOutputMultiplexer = + new WatermarkOutputMultiplexer(new SourceContextWatermarkOutputAdapter<>(sourceContext)); + this.partitionIdToOutputIdMap = new HashMap<>(); + for (Integer partition : assignedPartitions) { + String outputId = createOutputId(partition); + partitionIdToOutputIdMap.put(partition, outputId); + watermarkOutputMultiplexer.registerNewOutput(outputId, watermark -> {}); + } + } + + // Pre-check watermark flag to avoid per-record null checks in the hot path + final boolean enableWatermark = watermarkGenerator != null; + while (this.isRunning) { AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper( FlinkArrowUtils.getRootAllocator(), @@ -197,20 +293,71 @@ public void add(String name, long value) { AuronAdaptor.getInstance() .getAuronConfiguration() .getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE)); - while (wrapper.loadNextBatch(batch -> { - Map tmpOffsets = new HashMap<>(currentOffsets); - FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); - for (int i = 0; i < batch.getRowCount(); i++) { - AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); - // update kafka partition and offsets - tmpOffsets.put(tmpRowData.getInt(-3), tmpRowData.getLong(-2)); - sourceContext.collect(arrowReader.read(i)); - } - synchronized (lock) { - currentOffsets = tmpOffsets; - } - })) {} - ; + + if (enableWatermark) { + // Watermark-enabled path + while (wrapper.loadNextBatch(batch -> { + Map tmpOffsets = new HashMap<>(currentOffsets); + FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); + + for (int i = 0; i < batch.getRowCount(); i++) { + AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); + // Extract kafka meta fields + int partitionId = tmpRowData.getInt(-3); + long offset = tmpRowData.getLong(-2); + long kafkaTimestamp = tmpRowData.getLong(-1); + tmpOffsets.put(partitionId, offset); + + // Extract event timestamp via user-defined TimestampAssigner + long timestamp = timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp); + + // Route to the per-partition WatermarkOutput and trigger onEvent + // outputId must not null, else is a bug + String outputId = partitionIdToOutputIdMap.get(partitionId); + WatermarkOutput partitionOutput = watermarkOutputMultiplexer.getImmediateOutput(outputId); + watermarkGenerator.onEvent(tmpRowData, timestamp, partitionOutput); + // Emit record with event timestamp + sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp); + } + + // Periodic watermark: only emit if enough time has elapsed since last emit + // Controlled by ExecutionConfig.getAutoWatermarkInterval() + long currentTime = System.currentTimeMillis(); + if (autoWatermarkInterval > 0 + && (currentTime - lastPeriodicWatermarkTime) >= autoWatermarkInterval) { + for (Map.Entry entry : partitionIdToOutputIdMap.entrySet()) { + // Use getDeferredOutput for periodic emit: all partitions update first, + // then multiplexer merges and emits once via onPeriodicEmit() + WatermarkOutput output = watermarkOutputMultiplexer.getDeferredOutput(entry.getValue()); + watermarkGenerator.onPeriodicEmit(output); + } + // Merge all deferred updates and emit the combined watermark downstream + watermarkOutputMultiplexer.onPeriodicEmit(); + lastPeriodicWatermarkTime = currentTime; + } + + synchronized (lock) { + currentOffsets = tmpOffsets; + } + })) {} + } else { + // No-watermark path: still use collectWithTimestamp with kafka timestamp + while (wrapper.loadNextBatch(batch -> { + Map tmpOffsets = new HashMap<>(currentOffsets); + FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); + for (int i = 0; i < batch.getRowCount(); i++) { + AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); + int partitionId = tmpRowData.getInt(-3); + long offset = tmpRowData.getLong(-2); + long kafkaTimestamp = tmpRowData.getLong(-1); + tmpOffsets.put(partitionId, offset); + sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp); + } + synchronized (lock) { + currentOffsets = tmpOffsets; + } + })) {} + } } LOG.info("Auron kafka source run end"); } @@ -220,6 +367,18 @@ public void cancel() { this.isRunning = false; } + @Override + public void close() throws Exception { + this.isRunning = false; + + // Close the metadata-only Kafka Consumer + if (kafkaConsumer != null) { + kafkaConsumer.close(); + } + + super.close(); + } + @Override public List getPhysicalPlanNodes() { return Collections.singletonList(physicalPlanNode); @@ -318,4 +477,23 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.info("Not restore from state."); } } + + public AuronKafkaSourceFunction assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy) { + checkNotNull(watermarkStrategy); + try { + ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.watermarkStrategy = new SerializedValue<>(watermarkStrategy); + } catch (Exception e) { + throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e); + } + return this; + } + + // ------------------------------------------------------------------------- + // Internal helpers + // ------------------------------------------------------------------------- + + private String createOutputId(int partitionId) { + return topic + "-" + partitionId; + } } diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java new file mode 100644 index 000000000..6555cdac4 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.annotation.Internal; + +/** Utility for assigning Kafka partitions to consumer subtasks. Copy from flink-connector-kafka. */ +@Internal +public class KafkaTopicPartitionAssigner { + + /** + * Returns the index of the target subtask that a specific Kafka partition should be assigned + * to. + * + *

The resulting distribution of partitions of a single topic has the following contract: + * + *

    + *
  • 1. Uniformly distributed across subtasks + *
  • 2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask + * indices) by using the partition id as the offset from a starting index (i.e., the index + * of the subtask which partition 0 of the topic will be assigned to, determined using the + * topic name). + *
+ * + *

The above contract is crucial and cannot be broken. Consumer subtasks rely on this + * contract to locally filter out partitions that it should not subscribe to, guaranteeing that + * all partitions of a single topic will always be assigned to some subtask in a uniformly + * distributed manner. + * + * @param partition the Kafka partition + * @param numParallelSubtasks total number of parallel subtasks + * @return index of the target subtask that the Kafka partition should be assigned to. + */ + public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { + return assign(partition.getTopic(), partition.getPartition(), numParallelSubtasks); + } + + public static int assign(String topic, int partition, int numParallelSubtasks) { + int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; + + // here, the assumption is that the id of Kafka partitions are always ascending + // starting from 0, and therefore can be used directly as the offset clockwise from the + // start index + return (startIndex + partition) % numParallelSubtasks; + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java new file mode 100644 index 000000000..ea8194417 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +/** + * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link + * org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. + */ +public class SourceContextWatermarkOutputAdapter implements WatermarkOutput { + private final SourceContext sourceContext; + + public SourceContextWatermarkOutputAdapter(SourceContext sourceContext) { + this.sourceContext = sourceContext; + } + + @Override + public void emitWatermark(Watermark watermark) { + sourceContext.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp())); + } + + @Override + public void markIdle() { + sourceContext.markAsTemporarilyIdle(); + } + + @Override + public void markActive() { + // will be set active with next watermark + } +} diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs index aa43e5986..f5be480c9 100644 --- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs +++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs @@ -252,6 +252,21 @@ fn read_serialized_records_from_kafka( let restored_offsets = task_json .get("restored_offsets") .expect("restored_offsets is not valid json"); + let mut partitions: Vec = vec![]; + if let Some(assigned_partitions) = task_json.get("assigned_partitions") { + if let Some(array) = assigned_partitions.as_array() { + array.iter().for_each(|v| { + if let Some(num) = v.as_i64() { + partitions.push(num as i32); + } + }); + } + } + if partitions.is_empty() { + return Err(DataFusionError::Execution(format!( + "No partitions found for topic: {kafka_topic}" + ))); + } let kafka_properties = sonic_rs::from_str::(&kafka_properties_json) .expect("kafka_properties_json is not valid json"); let mut config = ClientConfig::new(); @@ -279,34 +294,6 @@ fn read_serialized_records_from_kafka( .create_with_context(context) .expect("Kafka Consumer creation failed"), ); - let metadata = consumer - .fetch_metadata(Some(&kafka_topic), Some(std::time::Duration::from_secs(5))) - .expect("Failed to fetch kafka metadata"); - - // get topic metadata - let topic_metadata = metadata - .topics() - .iter() - .find(|t| t.name() == kafka_topic) - .expect("Topic not found"); - - // get partition metadata - let partitions: Vec = topic_metadata - .partitions() - .iter() - .filter(|p| { - flink_kafka_partition_assign(kafka_topic.clone(), p.id(), num_readers) - .expect("flink_kafka_partition_assign failed") - == subtask_index - }) - .map(|p| p.id()) - .collect(); - - if partitions.is_empty() { - return Err(DataFusionError::Execution(format!( - "No partitions found for topic: {kafka_topic}" - ))); - } // GROUP_OFFSET = 0; // EARLIEST = 1;