Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,40 @@
run: |
./dev/ci/check-working-tree-clean.sh

# Compile-only verification for Spark 4.1. Tests are intentionally skipped: the spark-4.1
# profile is currently a build target only, and several runtime/test failures are tracked
# in follow-up PRs. Excluded from lint-java because semanticdb-scalac_2.13.17 is not yet
# published and the lint job activates -Psemanticdb.
build-spark-4-1:
needs: lint
name: Build Spark 4.1, JDK 17
runs-on: ubuntu-latest
container:
image: amd64/rust
steps:
- uses: actions/checkout@v6

- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ env.RUST_VERSION }}
jdk-version: 17

- name: Cache Maven dependencies
uses: actions/cache@v5
with:
path: |
~/.m2/repository
/root/.m2/repository
key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}-spark-4.1-build
restore-keys: |
${{ runner.os }}-java-maven-

- name: Compile (skip tests)
run: ./mvnw -B install -DskipTests -Dmaven.test.skip=true -Pspark-4.1

# Build native library once and share with all test jobs
build-native:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
needs: lint
name: Build Native Library
runs-on: ${{ github.repository_owner == 'apache' && format('runs-on={0},family=m8a+m7a+c8a,cpu=8,image=ubuntu24-full-x64,extras=s3-cache,disk=large,tag=datafusion-comet', github.run_id) || 'ubuntu-latest' }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.sql.execution.datasources.VariantMetadata
import org.apache.spark.sql.types.{DataType, StringType, StructType}

trait CometTypeShim {
// A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY)
// collations have semantics Comet's byte-level hashing/sorting/equality cannot honor. The
// default `StringType` object is `StringType(UTF8_BINARY_COLLATION_ID)`, so comparing
// `collationId` against that instance's id picks out non-default collations without needing
// `private[sql]` helpers on `StringType`.
def isStringCollationType(dt: DataType): Boolean = dt match {
case st: StringType => st.collationId != StringType.collationId
case _ => false
}

// Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose
// fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as
// ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet
// variant shredding layout, so reading such a struct natively returns nulls. Detect the marker
// and force scan fallback.
def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
PartitionedFile(
partitionValues,
SparkPath.fromUrlString(file),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
0,
0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

trait ShimCometConf {
protected val COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetRowIndexUtil
import org.apache.spark.sql.types.StructType

object ShimFileFormat {
// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME

def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int =
ParquetRowIndexUtil.findRowIndexColumnIndexInSchema(sparkSchema)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.AccumulatorV2

object ShimTaskMetrics {

def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
taskMetrics._externalAccums.lastOption
}
24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,30 @@ under the License.
</properties>
</profile>

<profile>
<!-- WIP: Spark 4.1 support, with its own shim sources for 4.1-specific APIs -->
<id>spark-4.1</id>
<properties>
<!-- Spark 4.1.1 is compiled against Scala 2.13.17 and emits calls into stdlib methods
added in that release (e.g. MurmurHash3$.caseClassHash$default$2()). Comet must
match to avoid runtime NoSuchMethodError. Note: semanticdb-scalac_2.13.17 is not
yet published, so the -Psemanticdb / scalafix lint job is skipped for spark-4.1. -->
<scala.version>2.13.17</scala.version>
<scala.binary.version>2.13</scala.binary.version>
<spark.version>4.1.1</spark.version>
<spark.version.short>4.1</spark.version.short>
<parquet.version>1.16.0</parquet.version>
<semanticdb.version>4.13.6</semanticdb.version>
<slf4j.version>2.0.17</slf4j.version>
<shims.majorVerSrc>spark-4.1</shims.majorVerSrc>
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
<!-- Use jdk17 by default -->
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
</profile>

<profile>
<id>scala-2.12</id>
</profile>
Expand Down
25 changes: 25 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,31 @@ under the License.
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-4.1</id>
<dependencies>
<!-- iceberg-spark-runtime-4.1 is not yet published; reuse the 4.0 runtime -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-4.0_${scala.binary.version}</artifactId>
<version>1.10.0</version>
<scope>test</scope>
</dependency>
<!-- Jetty 11.x for Spark 4.1 (jakarta.servlet); matches Spark 4.1.1's jetty.version -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>11.0.26</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>11.0.26</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>generate-docs</id>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
Expand Down Expand Up @@ -171,8 +170,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
mapOutputWriter
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
.getPartitionLengths();
mapStatus =
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
mapStatus = MapStatusHelper.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final long openStartTime = System.nanoTime();
Expand Down Expand Up @@ -262,7 +260,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

// TODO: We probably can move checksum generation here when concatenating partition files
partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
mapStatus = MapStatusHelper.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.BaseShuffleHandle;
Expand Down Expand Up @@ -288,7 +287,7 @@ void closeAndWriteOutput() throws IOException {
}
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
mapStatus = MapStatusHelper.apply(blockManager.shuffleServerId(), partitionLengths, mapId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ object CometSparkSessionExtensions extends Logging {
org.apache.spark.SPARK_VERSION >= "4.0"
}

def isSpark41Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "4.1"
}

/**
* Whether we should override Spark memory configuration for Comet. This only returns true when
* Comet native execution is enabled and/or Comet shuffle is enabled and Comet doesn't use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ object CometSum extends CometAggregateExpressionSerde[Sum] {
return None
}

val evalMode = sum.evalMode
val evalMode = CometEvalModeUtil.sumEvalMode(sum)

val childExpr = exprToProto(sum.child, inputs, binding)
val dataType = serializeDataType(sum.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.spark.sql.comet.execution.shuffle

import java.util.Collections
import java.util.concurrent.ConcurrentHashMap

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -101,7 +100,9 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
case 2 =>
c.newInstance(conf, null).asInstanceOf[IndexShuffleBlockResolver]
case 3 =>
c.newInstance(conf, null, Collections.emptyMap())
// Spark 4.1 changed the third parameter type from java.util.Map to
// java.util.concurrent.ConcurrentMap. ConcurrentHashMap satisfies both.
c.newInstance(conf, null, new ConcurrentHashMap[Int, OpenHashSet[Long]]())
.asInstanceOf[IndexShuffleBlockResolver]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.execution.shuffle

import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId

// Spark 4.1 added a `checksumVal` parameter (with a default of 0) to MapStatus.apply.
// Java callers can't use Scala default parameters, so we wrap the call here. The Scala
// compiler fills in the default per Spark version.
object MapStatusHelper {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long): MapStatus =
MapStatus(loc, uncompressedSizes, mapTaskId)
}
Loading
Loading