Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
756e5f3
Add classic.SparkSessionProvider
May 24, 2026
3b2c7cd
Move test.SharedSparkSessionBase functionality to sql.SharedSparkSession
May 28, 2026
67dd5f0
[API CHANGE]: Move doThreadPreAudit, doThreadPostAudit to sql.SharedS…
May 28, 2026
49778ad
Rename sql.SharedSparkSession to sql.SparkSessionBinder to prevent sh…
May 28, 2026
3a6caaa
Deprecate test.SharedSparkSession
May 28, 2026
cbda4b5
Add connect.SparkSession{Provider,Binder}, connect.QueryTest and demo
May 25, 2026
284c012
Add classic.SparkSessionBinder with usage demonstration
May 28, 2026
af8915d
fixup: fix compile error
May 28, 2026
d407555
Restructure so that SparkSessionBinder implements QueryTest, address …
May 29, 2026
56b9281
fixup
May 29, 2026
442ff43
Have SharedSparkSession as empty alias of classic.SparkSessionBinder
May 29, 2026
36e2940
fixup
May 29, 2026
2603ca7
partial refactor of connect/classic test
Jun 3, 2026
8d5f248
rest of refactor
Jun 3, 2026
749120e
Add example suite
Jun 3, 2026
0b427b4
Minimize sql.SparkSessionBinder stuff
Jun 9, 2026
6e81376
WIP
Jun 9, 2026
9b0b938
WIP: partially refactor DSv2IncrementallyConstructedQueryTests.scala
Jun 9, 2026
82185e2
Remove extra checkAnswer helpers to have on thing to override
Jun 10, 2026
473f4ff
Add SessionQueryTest::sessionType
Jun 10, 2026
33c369a
Fix DSv2ExternalMutationTestBase by replacing 'isConnect' with sessio…
Jun 10, 2026
c847392
Remove unused imports
Jun 10, 2026
a9aede7
WIP
Jun 10, 2026
3ab9157
reset connect session in beforeEach/afterEach
Jun 12, 2026
772a8c6
Shutdown SparkConnectServer at end of afterAll, don't silence shutdow…
Jun 12, 2026
f7ab7e4
fix server session access in DataSourceV2DataFrameConnectSuite
Jun 12, 2026
3db84f3
Extract CheckError into CheckErrorHelper
Jun 15, 2026
8ded0b2
smash
Jun 15, 2026
02308de
add, fix scaladoc
Jun 15, 2026
cd8f516
Update, extend deprecation annotations
Jun 15, 2026
292dee4
Add missing newline at EOF
Jun 15, 2026
2958be0
Remove unused import
Jun 16, 2026
4e0335e
Merge branch 'master' into sharedsparksession-refactor-mostly-nonbrea…
Jun 17, 2026
88292e4
fixup: CheckErrorHelper
Jun 22, 2026
bcb97fd
Only use SessionQueryTestBase in DSv2ExternalMutationTestBase
Jun 22, 2026
6ca5822
Add SQLConfHelper to SessionQueryTestBase
Jun 22, 2026
a89b8b0
Fix QueryTest::checkAnswer
Jun 22, 2026
fc5b673
Catch analysis-time failure in CheckAnswerHelper
Jun 22, 2026
5bc0c8d
CheckAnswerHelper: limit df.queryExec access to classic dfs
Jun 22, 2026
3a41ba1
fix grammar
Jun 22, 2026
9946e07
fix capitalization
Jun 22, 2026
ec566ff
docstring fix
Jun 22, 2026
a2872d2
Don't add hive.SessionQueryTest (yet)
Jun 22, 2026
53a5b2c
Add docstring for connect.SessionQueryTest::isDfSorted
Jun 22, 2026
ef19ed4
Document that SparkSessionBinderBase is temporary
Jun 22, 2026
02e8c28
use precomputed analyzedDF instead of df in checkAnswer
Jun 23, 2026
8e83141
make connect.SparkSessionBinder::classicSession private
Jun 23, 2026
a9acd76
fixup: rename
Jun 23, 2026
13da2a0
add example testcase with conf stuff
Jun 23, 2026
03f4912
SessionQueryTestBase declares 'withConf' instead of extending SQLConf…
Jun 23, 2026
ca341c6
add connect isDfSorted and connect.DataFrame::explainString
Jun 23, 2026
9d0f4b3
fixup! add connect isDfSorted and connect.DataFrame::explainString
Jun 23, 2026
cd7183a
fixup! add connect isDfSorted and connect.DataFrame::explainString
Jun 23, 2026
895ff8d
deprecate rarely used method in SharedSparkSession
Jun 23, 2026
deac517
WIP: provide isDfSorted override for connect
Jun 23, 2026
783681f
fixup! add connect isDfSorted and connect.DataFrame::explainString
Jun 23, 2026
c009270
Don't deprecate SharedSparkSession[Base] yet
Jun 23, 2026
176a528
Don't refactor DSv2 classic/connect tests (yet)
Jun 23, 2026
ea62c55
refactor some DSv2 classic/connect tests to use SessionQueryTest
Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions core/src/test/scala/org/apache/spark/CheckErrorHelper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.scalatest.Suite

trait CheckErrorHelper { self: Suite =>

case class ExpectedContext(
contextType: QueryContextType,
objectType: String,
objectName: String,
startIndex: Int,
stopIndex: Int,
fragment: String,
callSitePattern: String
)

object ExpectedContext {
def apply(fragment: String, start: Int, stop: Int): ExpectedContext = {
ExpectedContext("", "", start, stop, fragment)
}

// Check the fragment only. This is only used when the fragment is distinguished within
// the query text
def apply(fragment: String): ExpectedContext = {
ExpectedContext("", "", -1, -1, fragment)
}

def apply(
objectType: String,
objectName: String,
startIndex: Int,
stopIndex: Int,
fragment: String): ExpectedContext = {
new ExpectedContext(QueryContextType.SQL, objectType, objectName, startIndex, stopIndex,
fragment, "")
}

def apply(fragment: String, callSitePattern: String): ExpectedContext = {
new ExpectedContext(QueryContextType.DataFrame, "", "", -1, -1, fragment, callSitePattern)
}
}

/**
* Parameter keys that are omitted from comparison when absent from the expected map.
* For each error condition, the set lists keys that are removed from the actual
* exception parameters before comparison with the expected map.
* Test suites may override this to add or change ignorable parameters per condition.
*/
protected def checkErrorIgnorableParameters: Map[String, Set[String]] = Map(
"TABLE_OR_VIEW_NOT_FOUND" -> Set("searchPath")
)

/**
* Checks an exception with an error condition against expected results.
* @param exception The exception to check
* @param condition The expected error condition identifying the error
* @param sqlState Optional the expected SQLSTATE, not verified if not supplied
* @param parameters A map of parameter names and values. The names are as defined
* in the error-classes file.
* @param matchPVals Optionally treat the parameters value as regular expression pattern.
* false if not supplied.
*/
protected def checkError(
exception: SparkThrowable,
condition: String,
sqlState: Option[String] = None,
parameters: Map[String, String] = Map.empty,
matchPVals: Boolean = false,
queryContext: Array[ExpectedContext] = Array.empty): Unit = {
val mismatches = new ListBuffer[String]

if (exception.getCondition != condition) {
mismatches += s"condition: expected '$condition' but got '${exception.getCondition}'"
}
sqlState.foreach { state =>
if (exception.getSqlState != state) {
mismatches += s"sqlState: expected '$state' but got '${exception.getSqlState}'"
}
}

val actualParameters = exception.getMessageParameters.asScala
val ignorable = checkErrorIgnorableParameters.getOrElse(condition, Set.empty[String])
val actualParametersToCompare = actualParameters.filter { case (k, _) =>
!ignorable.contains(k) || parameters.contains(k)
}
if (matchPVals) {
if (actualParametersToCompare.size != parameters.size) {
mismatches += s"parameters size: expected ${parameters.size} but got" +
s" ${actualParametersToCompare.size}"
}
actualParametersToCompare.foreach { case (key, actualVal) =>
parameters.get(key) match {
case None =>
mismatches += s"parameters: unexpected key '$key' with value '$actualVal'"
case Some(pattern) if !actualVal.matches(pattern) =>
mismatches += s"parameters['$key']: value '$actualVal' does not match pattern" +
s" '$pattern'"
case _ =>
}
}
parameters.keys.filterNot(actualParametersToCompare.contains).foreach { key =>
mismatches += s"parameters: missing expected key '$key'"
}
} else if (actualParametersToCompare != parameters) {
mismatches += s"parameters: expected $parameters but got $actualParametersToCompare"
}

val actualQueryContext = exception.getQueryContext()
if (actualQueryContext.length != queryContext.length) {
mismatches += s"queryContext.length: expected ${queryContext.length}" +
s" but got ${actualQueryContext.length}"
}
actualQueryContext.zip(queryContext).zipWithIndex.foreach {
case ((actual, expected), idx) =>
if (actual.contextType() != expected.contextType) {
mismatches += s"queryContext[$idx].contextType: expected ${expected.contextType}" +
s" but got ${actual.contextType()}"
}
if (actual.contextType() == QueryContextType.SQL) {
if (actual.objectType() != expected.objectType) {
mismatches += s"queryContext[$idx].objectType: expected '${expected.objectType}'" +
s" but got '${actual.objectType()}'"
}
if (actual.objectName() != expected.objectName) {
mismatches += s"queryContext[$idx].objectName: expected '${expected.objectName}'" +
s" but got '${actual.objectName()}'"
}
// If startIndex and stopIndex are -1, it means we simply want to check the
// fragment of the query context. This should be the case when the fragment is
// distinguished within the query text.
if (expected.startIndex != -1 && actual.startIndex() != expected.startIndex) {
mismatches += s"queryContext[$idx].startIndex: expected ${expected.startIndex}" +
s" but got ${actual.startIndex()}"
}
if (expected.stopIndex != -1 && actual.stopIndex() != expected.stopIndex) {
mismatches += s"queryContext[$idx].stopIndex: expected ${expected.stopIndex}" +
s" but got ${actual.stopIndex()}"
}
if (actual.fragment() != expected.fragment) {
mismatches += s"queryContext[$idx].fragment: expected '${expected.fragment}'" +
s" but got '${actual.fragment()}'"
}
} else if (actual.contextType() == QueryContextType.DataFrame) {
if (actual.fragment() != expected.fragment) {
mismatches += s"queryContext[$idx].fragment: expected '${expected.fragment}'" +
s" but got '${actual.fragment()}'"
}
if (expected.callSitePattern.nonEmpty &&
!actual.callSite().matches(expected.callSitePattern)) {
mismatches += s"queryContext[$idx].callSite: '${actual.callSite()}'" +
s" does not match pattern '${expected.callSitePattern}'"
}
}
}

if (mismatches.nonEmpty) {
val sb = new StringBuilder
sb.append(s"checkError found ${mismatches.size} mismatch(es).\n\n")
sb.append("=== Actual Exception State ===\n")
sb.append(s" condition: ${exception.getCondition}\n")
sb.append(s" sqlState: ${exception.getSqlState}\n")
sb.append(s" parameters:\n")
if (actualParameters.isEmpty) {
sb.append(" (empty)\n")
} else {
actualParameters.foreach { case (k, v) => sb.append(s" $k -> $v\n") }
}
actualQueryContext.zipWithIndex.foreach { case (ctx, idx) =>
sb.append(s" queryContext[$idx] (${ctx.contextType()}):\n")
if (ctx.contextType() == QueryContextType.SQL) {
sb.append(s" objectType: ${ctx.objectType()}\n")
sb.append(s" objectName: ${ctx.objectName()}\n")
sb.append(s" startIndex: ${ctx.startIndex()}\n")
sb.append(s" stopIndex: ${ctx.stopIndex()}\n")
sb.append(s" fragment: ${ctx.fragment()}\n")
} else if (ctx.contextType() == QueryContextType.DataFrame) {
sb.append(s" fragment: ${ctx.fragment()}\n")
sb.append(s" callSite: ${ctx.callSite()}\n")
}
}
sb.append("\n=== Mismatches ===\n")
mismatches.foreach(m => sb.append(s" $m\n"))
fail(sb.toString())
}
}
}
Loading