diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 1e3acbc001b3c..5761028f60234 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -217,6 +217,10 @@ singleTableSchema : colTypeList EOF ; +singlePathElementList + : pathElement (COMMA pathElement)* EOF + ; + singleRoutineParamList : colDefinitionList EOF ; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java index 23f3acc7230fa..20586f57bcfdd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java @@ -28,9 +28,10 @@ * views, and functions. *

* Catalog implementations must implement this marker interface to be loaded by - * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the + * {@link org.apache.spark.sql.connector.catalog.Catalogs#load(String,SQLConf)}. + * The loader will instantiate catalog classes using the * required public no-arg constructor. After creating an instance, it will be configured by calling - * {@link #initialize(String, CaseInsensitiveStringMap)}. + * {@link #initialize(String,CaseInsensitiveStringMap)}. *

* Catalog implementations are registered to a name by adding a configuration option to Spark: * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties @@ -56,8 +57,8 @@ public interface CatalogPlugin { /** * Called to get this catalog's name. *

- * This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is - * called to pass the catalog's name. + * This method is only called after + * {@link #initialize(String,CaseInsensitiveStringMap)} is called to pass the catalog's name. */ String name(); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 95b5c54f782fc..f31354179674e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1976,14 +1976,15 @@ class Analyzer( * This is used for special syntax transformations (e.g., COUNT(*) -> COUNT(1)) that * should only apply to builtin functions, not to user-defined functions. * - * In legacy mode (sessionOrder="first"), temp functions shadow builtins, so an - * unqualified name that matches a temp function should NOT be treated as builtin. + * When the effective SQL PATH puts `system.session` before `system.builtin`, temp + * functions shadow builtins, so an unqualified name that matches a temp function + * should NOT be treated as builtin. */ private def matchesFunctionName(nameParts: Seq[String], expectedName: String): Boolean = { if (!FunctionResolution.isUnqualifiedOrBuiltinFunctionName(nameParts, expectedName)) { return false } - if (nameParts.size == 1 && conf.sessionFunctionResolutionOrder == "first") { + if (nameParts.size == 1 && functionResolution.isSessionBeforeBuiltinInPath) { val v1Catalog = catalogManager.v1SessionCatalog !v1Catalog.isTemporaryFunction(FunctionIdentifier(nameParts.head)) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index 4721425d2cb55..e3dbcc4b6ef7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -76,6 +76,22 @@ class FunctionResolution( nameParts.length == 3 && nameParts.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) + /** + * True iff `system.session` is searched before `system.builtin` in the effective SQL PATH. + * + * Drives the `count(*) -> count(1)` rewrite (which must skip transformation when a temp + * `count` shadows the builtin) and the `SessionCatalog` security check that blocks creating + * a temp function with a builtin's name. Reads the live PATH via `CatalogManager` and + * applies the same kinds extraction that drives `SessionCatalog`'s fast-path provider, so + * the predicate stays in sync with the lookup loop's actual order. + */ + def isSessionBeforeBuiltinInPath: Boolean = { + val path = catalogManager.sqlResolutionPathEntries( + catalogManager.currentCatalog.name(), catalogManager.currentNamespace.toSeq) + CatalogManager.systemFunctionKindsFromPath(path).headOption + .contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp) + } + /** * Produces the ordered list of candidate names for resolution. Expansion happens in two cases: * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala index 1a8658bb764d5..dd70963a79841 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala @@ -60,7 +60,7 @@ import org.apache.spark.sql.catalyst.util.CollationFactory */ class FunctionResolver( expressionResolver: ExpressionResolver, - functionResolution: FunctionResolution, + protected val functionResolution: FunctionResolution, aggregateExpressionResolver: AggregateExpressionResolver, binaryArithmeticResolver: BinaryArithmeticResolver) extends TreeNodeResolver[UnresolvedFunction, Expression] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala index 503c94fc9cdf6..3c5a3f1832e8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis.resolver import java.util.Locale +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.{ + FunctionResolution, ResolvedStar, Star, UnresolvedFunction, @@ -35,6 +37,7 @@ import org.apache.spark.sql.internal.SQLConf */ trait FunctionResolverUtils { protected def expressionResolver: ExpressionResolver + protected def functionResolution: FunctionResolution protected def conf: SQLConf private val scopes = expressionResolver.getNameScopes @@ -99,7 +102,21 @@ trait FunctionResolverUtils { unresolvedFunction: UnresolvedFunction, normalizeFunctionName: Boolean = true ): Boolean = { - !unresolvedFunction.isDistinct && isCount(unresolvedFunction, normalizeFunctionName) + !unresolvedFunction.isDistinct && + isCount(unresolvedFunction, normalizeFunctionName) && + !isUnqualifiedCountShadowedByTemp(unresolvedFunction) + } + + /** + * Keep single-pass behavior aligned with fixed-point: when PATH puts system.session before + * system.builtin and a temp `count` exists, unqualified `count(*)` must not be rewritten to + * `count(1)`. + */ + private def isUnqualifiedCountShadowedByTemp(unresolvedFunction: UnresolvedFunction): Boolean = { + unresolvedFunction.nameParts.length == 1 && + functionResolution.isSessionBeforeBuiltinInPath && + functionResolution.catalogManager.v1SessionCatalog + .isTemporaryFunction(FunctionIdentifier(unresolvedFunction.nameParts.head)) } private def isCount( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala index 6b90a5c05baf1..676ef381f2f17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors */ class HigherOrderFunctionResolver( protected val expressionResolver: ExpressionResolver, - functionResolution: FunctionResolution) + protected val functionResolution: FunctionResolution) extends TreeNodeResolver[UnresolvedFunction, Expression] with ProducesUnresolvedSubtree with CoercesExpressionTypes diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 32aa8cccbd93a..249700ec0d92d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -113,18 +113,67 @@ class SessionCatalog( identifier.copy(funcName = "") == SESSION_NAMESPACE_TEMPLATE /** - * Session function kinds in resolution order for unqualified lookups. - * Matches [[SQLConf.sessionFunctionResolutionOrder]]: "first" (session first), - * "second" (default), "last" (builtin only; session tried after persistent). + * When set, unqualified builtin/temp function resolution uses this fixed kind order instead of + * [[catalogManagerForSessionFunctionKinds]] / [[SQLConf.systemPathOrder]]. For unit tests only; + * production relies on the catalog manager binding. */ - private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] = { - conf.sessionFunctionResolutionOrder match { - case "first" => Seq(Temp, Builtin) - case "last" => Seq(Builtin) - case _ => Seq(Builtin, Temp) // "second" (default) - } + @volatile private var sessionFunctionKindsTestOverride: Option[Seq[SessionFunctionKind]] = None + + /** + * Live PATH for session function kinds. Set from + * [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor via + * [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and the security check + * that blocks temp functions from shadowing builtins read the effective SQL PATH (post-`SET + * PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]] fallbacks already + * applied). + * + * When unset (e.g. standalone [[SessionCatalog]] in tests), kinds derive from + * [[SQLConf.systemPathOrder]] -- the seeded default path -- without assuming other legacy + * resolution-order conf beyond seeding `defaultPathOrder`. + */ + @volatile private var catalogManagerForSessionFunctionKinds: Option[CatalogManager] = None + + /** + * Wire live PATH-derived session function kinds from the session [[CatalogManager]]. + * Called once from [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor. + */ + private[sql] def bindCatalogManagerForSessionFunctionKinds(cm: CatalogManager): Unit = { + catalogManagerForSessionFunctionKinds = Some(cm) + } + + /** + * Pin session function kinds for tests (`None` clears). Uses `private[sql]` so tests under the + * `org.apache.spark.sql` package can control ordering without a public catalog API. + */ + private[sql] def setSessionFunctionKindsTestOverride( + kinds: Option[Seq[SessionFunctionKind]]): Unit = { + sessionFunctionKindsTestOverride = kinds } + /** + * Session function kinds in resolution order for unqualified lookups: test override if set, + * else live PATH from [[catalogManagerForSessionFunctionKinds]], else + * [[SQLConf.systemPathOrder]]. + */ + private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] = + sessionFunctionKindsTestOverride.getOrElse { + catalogManagerForSessionFunctionKinds match { + case Some(cm) => + CatalogManager.systemFunctionKindsFromPath( + cm.sqlResolutionPathEntries(cm.currentCatalog.name(), cm.currentNamespace.toSeq)) + case None => + CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder) + } + } + + /** + * True iff the effective SQL PATH searches `system.session` before `system.builtin`. Used + * to gate the security check that blocks temporary functions from silently shadowing a + * builtin of the same name. + */ + private def sessionFirstInPath: Boolean = + sessionFunctionKindsInResolutionOrder.headOption.contains(Temp) + /** * Checks if a namespace represents temporary functions. */ @@ -2080,12 +2129,11 @@ class SessionCatalog( qualifyIdentifier(func) } - // Security check: When legacy mode is enabled, block SQL-created temporary functions - // from shadowing builtin functions (to preserve master behavior) - // Scala UDFs are still allowed to shadow in legacy mode - // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function already exists - val sessionFirst = conf.sessionFunctionResolutionOrder == "first" - if (func.database.isEmpty && sessionFirst && !overrideIfExists) { + // Security check: when the effective SQL PATH searches `system.session` before + // `system.builtin`, block creating an unqualified temporary function whose name + // collides with a builtin so it cannot silently shadow that builtin via unqualified + // resolution. We throw ROUTINE_ALREADY_EXISTS to indicate the conflict. + if (func.database.isEmpty && sessionFirstInPath && !overrideIfExists) { val funcName = func.funcName // Check if function exists in builtin namespace (extensions are stored as builtins) val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName) @@ -2195,10 +2243,11 @@ class SessionCatalog( // Use FunctionIdentifier with session namespace for temporary functions val tempIdentifier = tempFunctionIdentifier(function.name.funcName) - // Security check: When legacy mode is enabled, block SQL-created temporary functions - // from shadowing builtin functions (including extensions) as a safeguard - // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function already exists - if ((conf.sessionFunctionResolutionOrder == "first") && !overrideIfExists) { + // Security check: when the effective SQL PATH searches `system.session` before + // `system.builtin`, block creating an unqualified temporary function whose name + // collides with a builtin (including extensions) so it cannot silently shadow that + // builtin via unqualified resolution. + if (sessionFirstInPath && !overrideIfExists) { val funcName = function.name.funcName // Check if function exists in builtin namespace (extensions are stored as builtins) val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName) @@ -2499,7 +2548,12 @@ class SessionCatalog( * Look up the `ExpressionInfo` of the given function by name. * Resolution order follows the configured path (e.g. builtin then session). */ - def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = synchronized { + def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = { + // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution order may call + // into [[CatalogManager]] (e.g. [[CatalogManager.sqlResolutionPathEntries]]), which can + // synchronize on the manager; another + // thread can hold that lock and call into this catalog (e.g. via `setCurrentNamespace`), + // which would deadlock if this method also synchronized on `this`. lookupFunctionWithShadowing(name, tableFunctionRegistry, checkBuiltinOperators = false) } @@ -2650,7 +2704,11 @@ class SessionCatalog( /** * Look up the [[ExpressionInfo]] associated with the specified function, assuming it exists. */ - def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized { + def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = { + // Intentionally not `synchronized` on this [[SessionCatalog]] (see + // [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp resolution uses + // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must not run under + // this catalog's intrinsic lock. if (name.database.isEmpty) { lookupBuiltinOrTempFunction(name.funcName) .orElse(lookupBuiltinOrTempTableFunction(name.funcName)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala index 216136d8a7c82..29bf924f244e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin import org.apache.spark.sql.catalyst.plans.logical.{CompoundPlanStatement, LogicalPlan} import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.connector.catalog.PathElement import org.apache.spark.sql.errors.QueryParsingErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -110,6 +111,18 @@ abstract class AbstractSqlParser extends AbstractParser with ParserInterface { } } + /** + * Parse the right-hand side of `SET PATH = ...` (a comma-separated list of path elements). + * Used by [[org.apache.spark.sql.connector.catalog.CatalogManager]] to honor the + * [[SQLConf.DEFAULT_PATH]] conf without re-implementing the SET PATH grammar. + */ + private[sql] def parsePathElements(sqlText: String): Seq[PathElement] = parse(sqlText) { parser => + val ctx = parser.singlePathElementList() + withErrorHandling(ctx, Some(sqlText)) { + astBuilder.visitSinglePathElementList(ctx) + } + } + def withErrorHandling[T](ctx: ParserRuleContext, sqlText: Option[String])(toResult: => T): T = { withOrigin(ctx, sqlText) { try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b3a4e05e82ff2..a6906be6d794d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, SupportsNamespaces, TableCatalog, TableWritePrivilege} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege} import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} @@ -708,6 +708,26 @@ class AstBuilder extends DataTypeAstBuilder visitMultipartIdentifier(ctx.multipartIdentifier) } + override def visitSinglePathElementList( + ctx: SinglePathElementListContext): Seq[PathElement] = withOrigin(ctx) { + ctx.pathElement().asScala.map(visitPathElement).toSeq + } + + override def visitPathElement(ctx: PathElementContext): PathElement = withOrigin(ctx) { + if (ctx.DEFAULT_PATH() != null) PathElement.DefaultPath + else if (ctx.SYSTEM_PATH() != null) PathElement.SystemPath + else if (ctx.PATH() != null) PathElement.PathRef + else if (ctx.CURRENT_DATABASE() != null || ctx.CURRENT_SCHEMA() != null) { + PathElement.CurrentSchema + } else { + val parts = visitMultipartIdentifier(ctx.multipartIdentifier()) + if (parts.length < 2) { + throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) + } + PathElement.SchemaInPath(parts) + } + } + override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) { typedVisit[DataType](ctx.dataType) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index f73b0e68cecce..9a39e4ebdd272 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.connector.catalog +import java.util.concurrent.atomic.AtomicReference + import scala.collection.mutable import scala.util.Try @@ -24,6 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, TempVariableManager} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.errors.QueryCompilationErrors @@ -52,6 +55,12 @@ class CatalogManager( // TODO: create a real SYSTEM catalog to host `TempVariableManager` under the SESSION namespace. val tempVariableManager: TempVariableManager = new TempVariableManager + // Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds list itself is + // pure data conversion (system entries from the path, in path order); the *decision* to use + // path-order kinds for unqualified lookups lives at the Strategy layer (see callers of + // [[CatalogManager.systemFunctionKindsFromPath]]). + v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this) + def catalog(name: String): CatalogPlugin = synchronized { if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) { v2SessionCatalog @@ -138,8 +147,62 @@ class CatalogManager( private var _sessionPath: Option[Seq[SessionPathEntry]] = None - /** Returns the raw stored session path entries, or None if no path is set. */ - def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { _sessionPath } + /** + * Cache for [[confDefaultPathEntries]]: stores the expanded [[SessionPathEntry]] list keyed + * on the trimmed [[SQLConf#DEFAULT_PATH]] string and + * [[SQLConf#SESSION_FUNCTION_RESOLUTION_ORDER]] value (the only conf that affects the + * expansion of `DEFAULT_PATH` / `SYSTEM_PATH` tokens). + * `CurrentSchemaEntry` markers are preserved unresolved so the cache stays valid across + * `USE SCHEMA`. + */ + private val confDefaultPathCache = + new AtomicReference[Option[(String, String, Seq[SessionPathEntry])]](None) + + /** + * Returns the effective session path entries: the explicit `SET PATH` value if stored, + * else the parsed [[SQLConf#DEFAULT_PATH]] conf if non-empty (mirroring how + * [[currentCatalog]] falls back to [[SQLConf#DEFAULT_CATALOG]]). Returns `None` when + * [[SQLConf#PATH_ENABLED]] is false or both sources are empty. + */ + def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { + if (!conf.pathEnabled) None + else _sessionPath.orElse(confDefaultPathEntries) + } + + /** Raw `_sessionPath` (post-`SET PATH`), without the [[SQLConf#DEFAULT_PATH]] fallback. */ + def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { _sessionPath } + + /** + * Parsed and expanded [[SQLConf#DEFAULT_PATH]] value, or `None` when the conf is empty. + * Reuses the SET PATH grammar via + * [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parsePathElements]] (via + * [[org.apache.spark.sql.catalyst.parser.CatalystSqlParser]]). An inner + * `DEFAULT_PATH` token resolves to the spark-builtin default ordering (cycle break). + * + * Unlike `SET PATH`, this does NOT run a duplicate check: lookup uses first-match + * resolution, so any redundant entry (including ones that only collide after a later + * `USE SCHEMA`) is dead code rather than an error. Cached so the hot path is a single + * atomic load on conf-stable sessions. + */ + def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = { + val confValue = conf.defaultPath + if (confValue == null || confValue.trim.isEmpty) { + confDefaultPathCache.set(None) + None + } else { + val trimmed = confValue.trim + val sessionOrder = conf.sessionFunctionResolutionOrder + val expanded = confDefaultPathCache.get() match { + case Some((k, ord, cached)) if k == trimmed && ord == sessionOrder => cached + case _ => + val elements = CatalystSqlParser.parsePathElements(trimmed) + val computed = PathElement.expand(elements, conf, this, isConfDefaultExpansion = true) + confDefaultPathCache.set(Some((trimmed, sessionOrder, computed))) + computed + } + if (expanded.isEmpty) None else Some(expanded) + } + } def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized { _sessionPath = Some(entries) @@ -150,18 +213,18 @@ class CatalogManager( } private[sql] def copySessionPathFrom(other: CatalogManager): Unit = synchronized { - _sessionPath = other.sessionPathEntries + _sessionPath = other.storedSessionPathEntries } /** * String form of the current resolution path for CURRENT_PATH(). - * When PATH is enabled and a session path is stored, formats the effective path entries - * with markers expanded. Otherwise falls back to the legacy resolutionSearchPath. + * When PATH is enabled and a session path is in effect (stored or via + * [[SQLConf#DEFAULT_PATH]]), formats the resolved entries. Otherwise falls back to the legacy + * resolutionSearchPath. */ def currentPathString: String = synchronized { import CatalogV2Implicits._ - val stored = if (conf.pathEnabled) _sessionPath else None - stored match { + sessionPathEntries match { case Some(entries) => val resolved = CatalogManager.resolvePathEntries( entries, currentCatalog.name(), currentNamespace.toSeq) @@ -174,8 +237,9 @@ class CatalogManager( /** * Ordered catalog/schema path entries for resolving unqualified SQL object names. - * When PATH is off or unset, applies [[SQLConf.defaultPathOrder]] (legacy). - * When PATH is explicitly set, uses the resolved stored path entries. + * When PATH is off or unset, applies [[SQLConf#defaultPathOrder]] (legacy). + * When PATH is in effect (stored or via the [[SQLConf#DEFAULT_PATH]] conf), uses the + * resolved entries. */ def sqlResolutionPathEntries( pathDefaultCatalog: String, @@ -185,8 +249,7 @@ class CatalogManager( val defaultEntry = if (pathDefaultNamespace.isEmpty) Seq(pathDefaultCatalog) else pathDefaultCatalog +: pathDefaultNamespace - val stored = if (conf.pathEnabled) _sessionPath else None - stored match { + sessionPathEntries match { case Some(entries) => CatalogManager.resolvePathEntries(entries, expandCatalog, expandNamespace) case None => @@ -204,16 +267,17 @@ class CatalogManager( /** * True if `system.session` is on the SQL path. Only literal path entries can match: the - * [[CurrentSchemaEntry]] marker expands to `currentCatalog.name() +: currentNamespace`, and + * [[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] marker expands to + * `currentCatalog.name() +: currentNamespace`, and * `system` is not a registered catalog (it is a synthetic namespace served via * [[org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog]] / `lookupBuiltinOrTempFunction`, * not loadable via [[catalog]]), so `currentCatalog.name()` cannot be `"system"`. If that * invariant ever changes, this short-circuit must be revisited. - * Inspecting stored entries directly avoids loading the configured default catalog. + * Inspecting effective entries directly avoids loading the configured default catalog. */ def isSystemSessionOnPath: Boolean = synchronized { if (!conf.pathEnabled) return true - _sessionPath match { + sessionPathEntries match { case None => true case Some(entries) => entries.exists { case CatalogManager.LiteralPathEntry(parts) => @@ -289,6 +353,7 @@ class CatalogManager( _currentNamespace = None _currentCatalogName = None _sessionPath = None + confDefaultPathCache.set(None) v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } @@ -333,16 +398,37 @@ private[sql] object CatalogManager extends Logging { /** * True if the multipart name uses the session temp view namespace: two-part `session.view` * or three-part `system.session.view`. The two-part form can also denote a persistent relation - * in schema `session`; resolution order is controlled by [[SQLConf.prioritizeSystemCatalog]]. + * in schema `session`; resolution order is controlled by [[SQLConf#prioritizeSystemCatalog]]. */ def isSessionQualifiedViewName(nameParts: Seq[String]): Boolean = { (nameParts.length == 2 && nameParts.head.equalsIgnoreCase(SESSION_NAMESPACE)) || isFullyQualifiedSystemSessionViewName(nameParts) } - /** True if a SQL path entry is the well-known `system.session` entry. */ + /** True if a SQL path entry is the well-known `system.session` entry (case-insensitive). */ def isSystemSessionPathEntry(parts: Seq[String]): Boolean = - parts == Seq(SYSTEM_CATALOG_NAME, SESSION_NAMESPACE) + parts.length == 2 && + parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && + parts(1).equalsIgnoreCase(SESSION_NAMESPACE) + + /** True if a SQL path entry is the well-known `system.builtin` entry (case-insensitive). */ + def isSystemBuiltinPathEntry(parts: Seq[String]): Boolean = + parts.length == 2 && + parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) && + parts(1).equalsIgnoreCase(BUILTIN_NAMESPACE) + + /** + * Extract `system.builtin` / `system.session` entries from a resolved PATH, mapped to + * [[SessionCatalog.SessionFunctionKind]] in path order. Pure data conversion -- callers + * decide whether and how to use this list. + */ + def systemFunctionKindsFromPath( + path: Seq[Seq[String]]): Seq[SessionCatalog.SessionFunctionKind] = + path.flatMap { e => + if (isSystemBuiltinPathEntry(e)) Some(SessionCatalog.Builtin) + else if (isSystemSessionPathEntry(e)) Some(SessionCatalog.Temp) + else None + } /** * A single entry in the session SQL path: either a literal schema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala new file mode 100644 index 0000000000000..ee9959762da9e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util.Locale + +import scala.collection.mutable + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.CatalogManager.{ + CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry +} +import org.apache.spark.sql.internal.SQLConf + +/** + * One element on the right-hand side of `SET PATH = ...`: either a well-known shortcut + * keyword (DEFAULT_PATH, SYSTEM_PATH, PATH, CURRENT_SCHEMA / CURRENT_DATABASE) or a + * fully qualified schema reference (`catalog.namespace...` with at least 2 parts). + * + * The same grammar is reused to parse the + * [[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] conf value, so this + * AST node lives in catalyst beside [[CatalogManager]] rather than in the runtime + * [[org.apache.spark.sql.execution.command.SetPathCommand]]. + */ +private[sql] sealed trait PathElement + +private[sql] object PathElement { + case object DefaultPath extends PathElement + case object SystemPath extends PathElement + case object PathRef extends PathElement + + /** + * Current database/schema (SQL aliases). Stored as the + * [[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] + * marker so resolution candidates expand against the live `USE SCHEMA`. + */ + case object CurrentSchema extends PathElement + + /** Fully qualified schema reference (`catalog.namespace...`). Must have at least 2 parts. */ + case class SchemaInPath(parts: Seq[String]) extends PathElement + + /** + * Expand a parsed [[PathElement]] list into concrete [[SessionPathEntry]] entries + * suitable for storing in [[CatalogManager._sessionPath]] or returning from + * [[CatalogManager#sessionPathEntries]]. + * + * @param isConfDefaultExpansion when true, an inner [[DefaultPath]] token resolves + * to the spark-builtin default ordering (cycle break) + * rather than reading + * [[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] again. + * Set to true when this method is invoked while + * parsing [[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] + * itself. + */ + def expand( + elements: Seq[PathElement], + conf: SQLConf, + catalogManager: CatalogManager, + isConfDefaultExpansion: Boolean = false): Seq[SessionPathEntry] = { + val currentSchemaSentinel = Seq("__current_schema__") + + def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map { + case p if p == currentSchemaSentinel => CurrentSchemaEntry + case p => LiteralPathEntry(p) + } + + def builtinDefaultWithCurrentSchema: Seq[SessionPathEntry] = + toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel))) + + def defaultPathExpansion: Seq[SessionPathEntry] = { + if (isConfDefaultExpansion) { + // Cycle break: inner DEFAULT_PATH inside the conf default value falls back to the + // spark-builtin default ordering instead of recursing. + builtinDefaultWithCurrentSchema + } else { + catalogManager.confDefaultPathEntries.getOrElse(builtinDefaultWithCurrentSchema) + } + } + + elements.flatMap { + case DefaultPath => + defaultPathExpansion + case SystemPath => + toEntries(conf.systemPathOrder) + case CurrentSchema => + Seq(CurrentSchemaEntry) + case PathRef => + catalogManager.storedSessionPathEntries.getOrElse(defaultPathExpansion) + case SchemaInPath(parts) => + Seq(LiteralPathEntry(parts)) + } + } + + /** + * Reject *static* duplicates in a SET PATH entry list: identical + * [[CatalogManager#LiteralPathEntry]] parts and repeated + * [[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] markers + * (the `current_schema` / `current_database` + * cross-alias case). Used for the interactive `SET PATH` form to surface user typos at + * statement time. + * + * Deliberately does NOT compare a [[CatalogManager#LiteralPathEntry]] against a + * [[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]]: such a + * "duplicate" depends on the live `USE SCHEMA` + * and is harmless at lookup (first-match resolution skips the dead literal). + * [[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] expansion skips this check + * entirely so transient `USE`-induced + * collisions don't wedge unqualified resolution. + */ + def validateNoStaticDuplicates( + entries: Seq[SessionPathEntry], + caseSensitive: Boolean): Seq[SessionPathEntry] = { + val seenLiterals = new mutable.HashSet[Seq[String]] + var seenCurrentSchema = false + entries.foreach { + case CurrentSchemaEntry => + if (seenCurrentSchema) { + throw new AnalysisException( + errorClass = "DUPLICATE_SQL_PATH_ENTRY", + messageParameters = Map("pathEntry" -> "current_schema")) + } + seenCurrentSchema = true + case LiteralPathEntry(parts) => + val key = if (caseSensitive) parts else parts.map(_.toLowerCase(Locale.ROOT)) + if (!seenLiterals.add(key)) { + throw new AnalysisException( + errorClass = "DUPLICATE_SQL_PATH_ENTRY", + messageParameters = Map( + "pathEntry" -> + parts.map(p => if (p.contains(".")) s"`$p`" else p).mkString("."))) + } + } + entries + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 58840ebf09eb5..9e1e0d5d5f958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -43,9 +43,11 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.catalog.PathElement.PathRef import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType} import org.apache.spark.storage.{StorageLevel, StorageLevelMapper} @@ -2471,6 +2473,29 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DEFAULT_PATH = + buildConf("spark.sql.defaultPath") + .version("4.2.0") + .doc("Default SQL PATH used when no SET PATH has been issued in the session; this is " + + "also the value to which `SET PATH = DEFAULT_PATH` expands. Accepts the full SET PATH " + + "grammar; an inner DEFAULT_PATH token resolves to the spark-builtin default ordering. " + + "The PATH keyword is not allowed in this conf value. " + + "When empty, the spark-builtin default ordering controlled by " + + "`spark.sql.functionResolution.sessionOrder` applies. Validated for syntax at set time; " + + "redundant entries are tolerated (lookup uses first-match resolution). The interactive " + + "SET PATH form still rejects static duplicates as a typo guard.") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .stringConf + .checkValue( + v => + v == null || v.trim.isEmpty || + Try(CatalystSqlParser.parsePathElements(v.trim)) + .toOption + .exists(!_.contains(PathRef)), + "The value must be empty or a comma-separated SET PATH element list " + + "(same grammar as SET PATH, except PATH is not allowed).") + .createWithDefault("") + // Whether to retain group by columns or not in GroupedData.agg. val DATAFRAME_RETAIN_GROUP_COLUMNS = buildConf("spark.sql.retainGroupColumns") .version("1.4.0") @@ -8555,6 +8580,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED) + def defaultPath: String = getConf(SQLConf.DEFAULT_PATH) + /** * Returns the resolution search path for error messages and resolution order. * This is the single source of truth for the search path used for functions, tables, and views. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8f4c77840f0cc..b4ece7329094e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -359,15 +359,7 @@ class SparkSqlAstBuilder extends AstBuilder { * }}} */ override def visitSetPath(ctx: SetPathContext): LogicalPlan = withOrigin(ctx) { - val elements = ctx.pathElement().asScala.map { pe => - if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath - else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath - else if (pe.PATH() != null) PathElement.PathRef - else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase - else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema - else PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier())) - }.toSeq - SetPathCommand(elements) + SetPathCommand(ctx.pathElement().asScala.map(visitPathElement).toSeq) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala index 70538160eefdb..82ab46ec9b140 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala @@ -17,40 +17,17 @@ package org.apache.spark.sql.execution.command -import java.util.Locale - import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.connector.catalog.CatalogManager.{ - CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry -} -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.connector.catalog.PathElement import org.apache.spark.sql.internal.SQLConf -/** - * Path element for SET PATH: either a well-known shortcut or a fully qualified schema reference. - * SchemaInPath requires at least 2 parts (catalog.namespace); multi-level namespaces are allowed. - */ -sealed trait PathElement - -object PathElement { - case object DefaultPath extends PathElement - case object SystemPath extends PathElement - case object PathRef extends PathElement - /** - * Current database/schema (SQL aliases). Stored as system.current_schema; expands when - * building resolution candidates so later USE SCHEMA is reflected. - */ - case object CurrentDatabase extends PathElement - case object CurrentSchema extends PathElement - /** Fully qualified schema reference (catalog.namespace...). Must have at least 2 parts. */ - case class SchemaInPath(parts: Seq[String]) extends PathElement -} - /** * Command for SET PATH = pathElement (, pathElement)* * Expands shortcuts at run time, validates no duplicates, and sets the internal session path. + * + * The [[PathElement]] AST and its expansion live in catalyst so that the same grammar can be + * reused to parse the [[SQLConf.DEFAULT_PATH]] conf value. */ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableCommand { @@ -64,23 +41,9 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman } val conf = sparkSession.sessionState.conf val catalogManager = sparkSession.sessionState.catalogManager - val currentCatalog = catalogManager.currentCatalog.name - val currentNamespace = catalogManager.currentNamespace.toSeq - val caseSensitive = conf.caseSensitiveAnalysis - val expanded = expandPathElements(elements, conf, catalogManager) - val seen = new scala.collection.mutable.HashSet[Seq[String]] - expanded.foreach { entry => - val concrete = entry.resolve(currentCatalog, currentNamespace) - def normalize(s: String): String = if (caseSensitive) s else s.toLowerCase(Locale.ROOT) - val key = concrete.map(normalize) - if (!seen.add(key)) { - throw new AnalysisException( - errorClass = "DUPLICATE_SQL_PATH_ENTRY", - messageParameters = Map("pathEntry" -> - concrete.map(p => if (p.contains(".")) s"`$p`" else p).mkString("."))) - } - } + val expanded0 = PathElement.expand(elements, conf, catalogManager) + val expanded = PathElement.validateNoStaticDuplicates(expanded0, conf.caseSensitiveAnalysis) if (expanded.isEmpty) { catalogManager.clearSessionPath() @@ -89,36 +52,4 @@ case class SetPathCommand(elements: Seq[PathElement]) extends LeafRunnableComman } Seq.empty } - - private def expandPathElements( - elements: Seq[PathElement], - conf: SQLConf, - catalogManager: CatalogManager): Seq[SessionPathEntry] = { - val currentSchemaSentinel = Seq("__current_schema__") - - def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map { - case p if p == currentSchemaSentinel => CurrentSchemaEntry - case p => LiteralPathEntry(p) - } - - def defaultWithCurrentSchema: Seq[SessionPathEntry] = - toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel))) - - elements.flatMap { - case PathElement.DefaultPath => - defaultWithCurrentSchema - case PathElement.SystemPath => - toEntries(conf.systemPathOrder) - case PathElement.CurrentDatabase | PathElement.CurrentSchema => - Seq(CurrentSchemaEntry) - case PathElement.PathRef => - catalogManager.sessionPathEntries.getOrElse(defaultWithCurrentSchema) - case PathElement.SchemaInPath(parts) => - if (parts.length < 2) { - throw QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString(".")) - } - Seq(LiteralPathEntry(parts)) - } - } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 5929e5c56f909..ff54b49eed3ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -167,9 +167,13 @@ case class DropFunctionCommand( identifier.funcName } - // Check if temp function exists first - if it does, allow dropping it even if a builtin - // with the same name exists (shadowing case) - if (!catalog.isTemporaryFunction(FunctionIdentifier(funcName)) && + // Keep DROP TEMPORARY FUNCTION semantics consistent for unqualified names: + // - builtin name, no temp present, no IF EXISTS => FORBIDDEN_OPERATION + // - IF EXISTS => no-op + // Qualified temp namespaces (session / system.session) always target temp functions. + if (identifier.database.isEmpty && + !ifExists && + !catalog.isTemporaryFunction(FunctionIdentifier(funcName)) && catalog.isBuiltinFunction(funcName)) { throw QueryCompilationErrors.cannotDropBuiltinFuncError(funcName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index ad6bcd414966e..18b9f6b6f3b7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -211,7 +212,7 @@ class SetPathSuite extends SharedSparkSession { }, condition = "DUPLICATE_SQL_PATH_ENTRY", sqlState = Some("42732"), - parameters = Map("pathEntry" -> "spark_catalog.default")) + parameters = Map("pathEntry" -> "current_schema")) } } @@ -231,16 +232,17 @@ class SetPathSuite extends SharedSparkSession { } } - test("PATH enabled: duplicate after expanding CURRENT_SCHEMA") { + test("PATH enabled: literal + CURRENT_SCHEMA collision is tolerated (USE-state dependent)") { + // SET PATH only rejects static duplicates (literal-vs-literal, current_schema repeated). + // A literal that happens to match the live current_schema is not flagged: a later + // `USE SCHEMA` may make them diverge, and at lookup the first match wins anyway. + // `system.builtin` is included so `current_path()` itself remains resolvable. withPathEnabled { sql("USE spark_catalog.default") - checkError( - exception = intercept[AnalysisException] { - sql("SET PATH = spark_catalog.default, current_schema") - }, - condition = "DUPLICATE_SQL_PATH_ENTRY", - sqlState = Some("42732"), - parameters = Map("pathEntry" -> "spark_catalog.default")) + sql("SET PATH = spark_catalog.default, current_schema, system.builtin") + val entries = pathEntries(currentPath()) + assert(entries === Seq("spark_catalog.default", "spark_catalog.default", "system.builtin"), + s"Expected literal + resolved CURRENT_SCHEMA preserved; got: $entries") } } @@ -253,7 +255,7 @@ class SetPathSuite extends SharedSparkSession { }, condition = "DUPLICATE_SQL_PATH_ENTRY", sqlState = Some("42732"), - parameters = Map("pathEntry" -> "spark_catalog.default")) + parameters = Map("pathEntry" -> "current_schema")) } } @@ -568,4 +570,243 @@ class SetPathSuite extends SharedSparkSession { } } } + + // --- spark.sql.defaultPath (SQLConf.DEFAULT_PATH) --- + // The conf carries the SET PATH grammar; sessionPathEntries falls back to it lazily + // when no `SET PATH` has been issued, mirroring how `currentCatalog` falls back to + // [[SQLConf.DEFAULT_CATALOG]]. + + test("DEFAULT_PATH conf: lazy fallback when no SET PATH issued") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.DEFAULT_PATH.key -> "spark_catalog.default, system.builtin") { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + catalogManager.clearSessionPath() + try { + val entries = pathEntries(currentPath()) + assert(entries == Seq("spark_catalog.default", "system.builtin"), + s"Expected DEFAULT_PATH conf to drive current_path(); got: $entries") + assert(catalogManager.storedSessionPathEntries.isEmpty, + "DEFAULT_PATH lookup must not write to the in-memory stored session path") + } finally { + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } + + test("DEFAULT_PATH conf: explicit SET PATH overrides the conf") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.DEFAULT_PATH.key -> "system.builtin, system.session") { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + try { + sql("SET PATH = system.session, system.builtin") + val entries = pathEntries(currentPath()) + assert(entries == Seq("system.session", "system.builtin"), + s"Expected SET PATH to win over DEFAULT_PATH conf; got: $entries") + } finally { + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } + + test("DEFAULT_PATH conf: SET PATH = DEFAULT_PATH expands to the conf value") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin, current_schema") { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + try { + sql("SET PATH = DEFAULT_PATH") + val entries = pathEntries(currentPath()) + assert(entries.head.contains("system.session"), + s"DEFAULT_PATH expansion should follow conf order (session first); got: $entries") + assert(catalogManager.storedSessionPathEntries.isDefined, + "After SET PATH the in-memory stored session path should be populated") + } finally { + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } + + test("DEFAULT_PATH conf: cycle break -- inner DEFAULT_PATH falls back to builtin order") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.DEFAULT_PATH.key -> "DEFAULT_PATH", + // Pin order conf to "first" so the spark-builtin default ordering is observable. + SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "first") { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + catalogManager.clearSessionPath() + try { + val entries = pathEntries(currentPath()) + assert(entries.head.contains("system.session"), + s"Inner DEFAULT_PATH should resolve to builtin order seeded by the order conf " + + s"('first' -> session leading); got: $entries") + } finally { + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } + + test("DEFAULT_PATH conf: invalid value rejected on SET spark.sql.defaultPath") { + withPathEnabled { + val e = intercept[SparkIllegalArgumentException] { + sql("SET spark.sql.defaultPath = this is not a path") + } + assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage) + } + } + + test("DEFAULT_PATH conf: PATH keyword is rejected on SET spark.sql.defaultPath") { + withPathEnabled { + val e = intercept[SparkIllegalArgumentException] { + sql("SET spark.sql.defaultPath = PATH, system.builtin") + } + assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage) + } + } + + test("DEFAULT_PATH conf: PATH disabled returns no fallback") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "false", + SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") { + val catalogManager = spark.sessionState.catalogManager + assert(catalogManager.sessionPathEntries.isEmpty, + "DEFAULT_PATH conf must not take effect when PATH is disabled") + } + } + + // --- Path-driven security check (built on the lazy DEFAULT_PATH fallback) --- + // The "block temp function shadowing builtin" check is now driven by the live PATH, so + // changes via SET PATH or DEFAULT_PATH take effect even when the legacy order conf is + // left at its default. + + test("path-driven security check: SET PATH putting session before builtin blocks temp " + + "function with a builtin name") { + withPathEnabled { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + try { + // Default `sessionFunctionResolutionOrder` is "second" (builtin first), but SET PATH + // overrides that to put session first. The security check must reflect the live path. + sql("SET PATH = system.session, system.builtin") + val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1") + } + assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage) + } finally { + sql("DROP TEMPORARY FUNCTION IF EXISTS session.count") + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } + + test("path-driven security check: DEFAULT_PATH conf putting session before builtin " + + "blocks temp function with a builtin name (no SET PATH issued)") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + catalogManager.clearSessionPath() + try { + // Order conf is left at its default ("second"). The path-driven gate must read + // DEFAULT_PATH and fire the security check for unqualified temp/builtin collisions. + val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1") + } + assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage) + } finally { + sql("DROP TEMPORARY FUNCTION IF EXISTS session.count") + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } + + test("PATH enabled: SET PATH with only user schemas does not implicitly resolve builtins") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS only_user_on_path") + try { + sql("SET PATH = spark_catalog.only_user_on_path") + val e = intercept[AnalysisException] { + sql("SELECT abs(-1)").collect() + } + assert(e.getCondition == "UNRESOLVED_ROUTINE", e.getMessage) + } finally { + sql("DROP SCHEMA IF EXISTS only_user_on_path") + } + } + } + + test("PATH enabled: explicit SET PATH with system.session AFTER a user catalog still " + + "reaches temp functions") { + // Explicit paths are honored as written: placing `system.session` after a user catalog + // is the user's authorization for unqualified temp functions to resolve. Contrast with + // the implicit (no SET PATH, no DEFAULT_PATH) form, which preserves the security property + // of the seeded default path. + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_interleaved_user") + try { + sql("CREATE TEMPORARY FUNCTION path_interleaved_temp() RETURNS INT RETURN 7") + try { + sql("SET PATH = system.builtin, spark_catalog.path_interleaved_user, system.session") + checkAnswer(sql("SELECT path_interleaved_temp()"), Row(7)) + } finally { + sql("DROP TEMPORARY FUNCTION IF EXISTS path_interleaved_temp") + } + } finally { + sql("DROP SCHEMA IF EXISTS path_interleaved_user") + } + } + } + + test("PATH enabled: SET PATH with user schema before system.builtin still resolves builtins") { + // Exercises systemFunctionKindsFromPath with a user-catalog entry preceding + // system.builtin: the helper flat-scans the path, so Builtin still appears + // in the kinds list and unqualified `abs` resolves. + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_user_before_builtin") + try { + sql("SET PATH = spark_catalog.path_user_before_builtin, system.builtin") + // `abs` is a builtin; if Builtin did not appear in the kinds list, + // unqualified `abs(-1)` would fail with UNRESOLVED_ROUTINE. + checkAnswer(sql("SELECT abs(-1)"), Row(1)) + } finally { + sql("DROP SCHEMA IF EXISTS path_user_before_builtin") + } + } + } + + test("DEFAULT_PATH conf: duplicate entries are tolerated (first-match resolution)") { + // Lookup uses first-match resolution, so redundant entries on DEFAULT_PATH are dead code + // rather than an error. (Contrast with SET PATH, which still rejects static duplicates as + // a user-input typo guard.) This avoids a UX cliff where a USE SCHEMA could later wedge + // every unqualified function lookup with DUPLICATE_SQL_PATH_ENTRY. + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.DEFAULT_PATH.key -> "system.builtin, system.builtin") { + val catalogManager = spark.sessionState.catalogManager + val priorSessionPath = catalogManager.storedSessionPathEntries + catalogManager.clearSessionPath() + try { + val entries = pathEntries(currentPath()) + assert(entries == Seq("system.builtin", "system.builtin"), + s"DEFAULT_PATH duplicates should pass through to current_path(); got: $entries") + // Sanity: unqualified resolution still works (the second `system.builtin` is dead). + checkAnswer(sql("SELECT abs(-1)"), Row(1)) + } finally { + catalogManager.clearSessionPath() + priorSessionPath.foreach(catalogManager.setSessionPath) + } + } + } }