Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,26 @@ trait SparkDateTimeUtils {
Math.floorMod(v.epochMicros, MICROS_PER_DAY) * NANOS_PER_MICROS + v.nanosWithinMicro
}

/**
* Extracts the time-of-day component (nanoseconds since midnight) from a `TIMESTAMP_LTZ`
* microsecond value. `TIMESTAMP_LTZ` denotes an absolute instant, so its time-of-day is the
* local wall-clock time observed in the session time zone `zoneId`. The result stays in
* `[0, NANOS_PER_DAY)`.
*/
def timestampToNanosOfDay(micros: Long, zoneId: ZoneId): Long = {
getLocalDateTime(micros, zoneId).toLocalTime.toNanoOfDay
}

/**
* Extracts the time-of-day component (nanoseconds since midnight) from a nanosecond-precision
* `TIMESTAMP_LTZ` value. `TIMESTAMP_LTZ` denotes an absolute instant, so its time-of-day is the
* local wall-clock time observed in the session time zone `zoneId`. The sub-microsecond digits
* carried in `nanosWithinMicro` are preserved, and the result stays in `[0, NANOS_PER_DAY)`.
*/
def timestampLTZNanosToNanosOfDay(v: TimestampNanosVal, zoneId: ZoneId): Long = {
timestampNanosToInstant(v).atZone(zoneId).toLocalTime.toNanoOfDay
}

/**
* Converts a local date at the default JVM time zone to the number of days since 1970-01-01 in
* the hybrid calendar (Julian + Gregorian) by discarding the time part. The resulted days are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,16 @@ object Cast extends QueryErrorsBase {
case (_: TimeType, _: TimeType) => true
case (_: TimeType, _: IntegralType) => true

// TIME(p) <-> TIMESTAMP_NTZ(q), q in [6, 9] (precision 6 is the micro TimestampNTZType,
// [7, 9] is TimestampNTZNanosType). Restricted to the NTZ family on purpose: TIMESTAMP_LTZ
// is not a valid counterpart for these casts.
// TIME(p) <-> TIMESTAMP_NTZ(q) / TIMESTAMP_LTZ(q), q in [6, 9] (precision 6 is the micro
// TimestampNTZType / TimestampType, [7, 9] is TimestampNTZNanosType / TimestampLTZNanosType).
case (_: TimeType, TimestampNTZType) => true
case (TimestampNTZType, _: TimeType) => true
case (_: TimeType, _: TimestampNTZNanosType) => true
case (_: TimestampNTZNanosType, _: TimeType) => true
case (_: TimeType, TimestampType) => true
case (TimestampType, _: TimeType) => true
case (_: TimeType, _: TimestampLTZNanosType) => true
case (_: TimestampLTZNanosType, _: TimeType) => true

// non-null variants can generate nulls even in ANSI mode
case (ArrayType(fromType, fn), ArrayType(toType, tn)) =>
Expand Down Expand Up @@ -319,13 +322,16 @@ object Cast extends QueryErrorsBase {
case (_: TimeType, _: TimeType) => true
case (_: TimeType, _: IntegralType) => true

// TIME(p) <-> TIMESTAMP_NTZ(q), q in [6, 9] (precision 6 is the micro TimestampNTZType,
// [7, 9] is TimestampNTZNanosType). Restricted to the NTZ family on purpose: TIMESTAMP_LTZ
// is not a valid counterpart for these casts.
// TIME(p) <-> TIMESTAMP_NTZ(q) / TIMESTAMP_LTZ(q), q in [6, 9] (precision 6 is the micro
// TimestampNTZType / TimestampType, [7, 9] is TimestampNTZNanosType / TimestampLTZNanosType).
case (_: TimeType, TimestampNTZType) => true
case (TimestampNTZType, _: TimeType) => true
case (_: TimeType, _: TimestampNTZNanosType) => true
case (_: TimestampNTZNanosType, _: TimeType) => true
case (_: TimeType, TimestampType) => true
case (TimestampType, _: TimeType) => true
case (_: TimeType, _: TimestampLTZNanosType) => true
case (_: TimestampLTZNanosType, _: TimeType) => true

case (ArrayType(fromType, fn), ArrayType(toType, tn)) =>
canCast(fromType, toType) &&
Expand Down Expand Up @@ -371,7 +377,9 @@ object Cast extends QueryErrorsBase {
* with it: a conversion needs the session time zone exactly when its `castTo*` / `castTo*Code`
* path reads `zoneId`. This is principally the string/date casts of the LTZ timestamp families
* (TIMESTAMP and TIMESTAMP_LTZ(p)), the cross-family TIMESTAMP_LTZ <-> TIMESTAMP_NTZ conversions
* (micro and nanosecond), and TIME -> TIMESTAMP_NTZ (whose date fields come from CURRENT_DATE).
* (micro and nanosecond), TIME -> TIMESTAMP_NTZ (whose date fields come from CURRENT_DATE), and
* both directions of TIME <-> TIMESTAMP_LTZ (the LTZ value is an absolute instant, so extracting
* its time-of-day and attaching CURRENT_DATE to a TIME both depend on the session time zone).
*/
def needsTimeZone(from: DataType, to: DataType): Boolean = (from, to) match {
case (VariantType, _) => true
Expand Down Expand Up @@ -404,6 +412,14 @@ object Cast extends QueryErrorsBase {
// time-of-day and is intentionally zone-independent, so it is absent here.
case (_: TimeType, TimestampNTZType) => true
case (_: TimeType, _: TimestampNTZNanosType) => true
// TIME <-> TIMESTAMP_LTZ depends on the session time zone in both directions: the LTZ value is
// an absolute instant, so its time-of-day is the local wall clock observed in the session zone,
// and TIME -> TIMESTAMP_LTZ attaches CURRENT_DATE (resolved in the session zone) and converts
// the resulting local date-time to an instant in that zone.
case (_: TimeType, TimestampType) => true
case (TimestampType, _: TimeType) => true
case (_: TimeType, _: TimestampLTZNanosType) => true
case (_: TimestampLTZNanosType, _: TimeType) => true
case (ArrayType(fromType, _), ArrayType(toType, _)) => needsTimeZone(fromType, toType)
case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) =>
needsTimeZone(fromKey, toKey) || needsTimeZone(fromValue, toValue)
Expand All @@ -428,6 +444,19 @@ object Cast extends QueryErrorsBase {
case _ => false
}

/**
* Returns true for a cast from `TIME(p)` to `TIMESTAMP_LTZ(q)` (q in [6, 9]; q=6 is the micro
* `TimestampType`, [7, 9] is `TimestampLTZNanosType`). Like the NTZ counterpart, such casts
* derive their date fields from `CURRENT_DATE`, so they are stabilized within a query by
* [[org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime]], which scans `CAST` nodes and
* uses this predicate on the resolved plan.
*/
def isTimeToTimestampLTZ(from: DataType, to: DataType): Boolean = (from, to) match {
case (_: TimeType, TimestampType) => true
case (_: TimeType, _: TimestampLTZNanosType) => true
case _ => false
}

/**
* Returns true iff we can safely up-cast the `from` type to `to` type without any truncating or
* precision lose or possible runtime failures. For example, long -> int, string -> int are not
Expand Down Expand Up @@ -882,6 +911,14 @@ case class Cast(
} else {
buildCast[Float](_, f => doubleToTimestamp(f.toDouble))
}
case _: TimeType =>
// Per ANSI, the date fields come from CURRENT_DATE (resolved in the session time zone), and
// the resulting local date-time is converted to an instant in that zone. In a real query
// plan ComputeCurrentTime rewrites this cast with a query-stable date literal; this eval
// path is the fallback for direct expression evaluation and reads the current date in the
// session time zone.
buildCast[Long](_, nanos =>
DateTimeUtils.makeTimestamp(currentDate(zoneId), nanos, zoneId))
}

private[this] def castToTimestampNTZ(from: DataType): Any => Any = from match {
Expand Down Expand Up @@ -932,6 +969,12 @@ case class Cast(
DateTimeUtils.timestampNTZNanosToLTZNanos(v, zoneId, precision))
case DateType =>
buildCast[Int](_, d => TimestampNanosVal.fromParts(daysToMicros(d, zoneId), 0.toShort))
case _: TimeType =>
// See castToTimestamp: the date fields come from CURRENT_DATE in the session time zone, with
// the query-stable rewrite handled by ComputeCurrentTime. The sub-microsecond digits of the
// TIME value are preserved up to the target precision.
buildCast[Long](_, nanos =>
DateTimeUtils.makeTimestampLTZNanos(currentDate(zoneId), nanos, precision, zoneId))
}

private[this] def castToTimestampNTZNanos(
Expand Down Expand Up @@ -1024,6 +1067,14 @@ case class Cast(
case _: TimestampNTZNanosType =>
buildCast[TimestampNanosVal](_, v => DateTimeUtils.truncateTimeToPrecision(
DateTimeUtils.timestampNTZNanosToNanosOfDay(v), to.precision))
case TimestampType =>
// TIMESTAMP_LTZ is an absolute instant; its time-of-day is the local wall clock observed in
// the session time zone.
buildCast[Long](_, micros => DateTimeUtils.truncateTimeToPrecision(
DateTimeUtils.timestampToNanosOfDay(micros, zoneId), to.precision))
case _: TimestampLTZNanosType =>
buildCast[TimestampNanosVal](_, v => DateTimeUtils.truncateTimeToPrecision(
DateTimeUtils.timestampLTZNanosToNanosOfDay(v, zoneId), to.precision))
// Unreachable for valid casts: `canCast(_, TimeType)` only allows the source types handled
// above (and NullType is short-circuited in castInternal). Fail fast to keep the interpreted
// and codegen (castToTimeCode) paths consistent if a future canCast arm is added without a
Expand Down Expand Up @@ -1761,6 +1812,20 @@ case class Cast(
$evPrim = $dateTimeUtilsCls.truncateTimeToPrecision(
$dateTimeUtilsCls.timestampNTZNanosToNanosOfDay($c), ${to.precision});
"""
case TimestampType =>
val zid = zoneIdValue(ctx)
(c, evPrim, _) =>
code"""
$evPrim = $dateTimeUtilsCls.truncateTimeToPrecision(
$dateTimeUtilsCls.timestampToNanosOfDay($c, $zid), ${to.precision});
"""
case _: TimestampLTZNanosType =>
val zid = zoneIdValue(ctx)
(c, evPrim, _) =>
code"""
$evPrim = $dateTimeUtilsCls.truncateTimeToPrecision(
$dateTimeUtilsCls.timestampLTZNanosToNanosOfDay($c, $zid), ${to.precision});
"""
// Unreachable for valid casts (see castToTime). Fail fast at codegen time instead of
// silently emitting a null, matching the interpreted path.
case _ =>
Expand Down Expand Up @@ -1967,6 +2032,11 @@ case class Cast(
}
"""
}
case _: TimeType =>
val zid = zoneIdValue(ctx)
(c, evPrim, evNull) =>
code"$evPrim = $dateTimeUtilsCls.makeTimestamp(" +
code"$dateTimeUtilsCls.currentDate($zid), $c, $zid);"
}

private[this] def castToTimestampNTZCode(
Expand Down Expand Up @@ -2054,6 +2124,11 @@ case class Cast(
(c, evPrim, evNull) =>
code"$evPrim = TimestampNanosVal.fromParts(" +
code"$dateTimeUtilsCls.daysToMicros($c, $zid), (short) 0);"
case _: TimeType =>
val zid = zoneIdValue(ctx)
(c, evPrim, evNull) =>
code"$evPrim = $dateTimeUtilsCls.makeTimestampLTZNanos(" +
code"$dateTimeUtilsCls.currentDate($zid), $c, $precision, $zid);"
}

private[this] def castToTimestampNTZNanosCode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3066,6 +3066,92 @@ case class MakeTimestampNTZNanos(left: Expression, right: Expression, precision:
}
}

/**

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new MakeTimestampLTZ / MakeTimestampLTZNanos both mix RuntimeReplaceable (nodePatterns=Seq(RUNTIME_REPLACEABLE)) and TimeZoneAwareExpression, whose final override val nodePatterns = Seq(TIME_ZONE_AWARE_EXPRESSION) ++ nodePatternsInternal() WINS in linearization and silently DROPS RUNTIME_REPLACEABLE from the pattern set. Every other both-traits expression works around this by overriding nodePatternsInternal() = Seq(RUNTIME_REPLACEABLE) (verified: ParseToDate L2258, ParseToTimestamp L2372, MakeTimestampFromDateTime L3341); the new expressions do NOT. The merged NTZ sibling MakeTimestampNTZ sidesteps it by not mixing TimeZoneAwareExpression at all — so there is no NTZ precedent excusing the omission.

NOT a functional bug today (ComputeCurrentTime emits .replacement directly, bypassing ReplaceExpressions, and these are created after ReplaceExpressions runs), but a real maintainability divergence that could become a live bug if ReplaceExpressions ever re-runs after ComputeCurrentTime.

Fix: one-line override def nodePatternsInternal() = Seq(RUNTIME_REPLACEABLE) on each.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On MakeTimestampNTZNanos — I don't think it needs the override: unlike the two LTZ builders it does not mix in TimeZoneAwareExpression (it's BinaryExpression with RuntimeReplaceable with ExpectsInputTypes). So there's no final override val nodePatterns from TimeZoneAwareExpression to swallow the pattern — it inherits Seq(RUNTIME_REPLACEABLE) straight from the RuntimeReplaceable trait, and nodePatterns already contains it. Adding the override there would be a redundant no-op. The issue is specifically the RuntimeReplaceable + TimeZoneAwareExpression combination, which is why only the LTZ builders (and the existing ParseToDate / ParseToTimestamp / MakeTimestampFromDateTime) need it. Happy to add it anyway if you'd prefer it purely for visual symmetry, but I'd lean toward leaving it as-is to avoid implying a dependency that isn't there.

* Creates a `TIMESTAMP_LTZ` (micro `TimestampType`) from a date and a local time interpreted in the
* session time zone. This is the LTZ counterpart of [[MakeTimestampNTZ]]; because the result is an
* absolute instant it is time-zone aware. It is an internal expression used by
* [[org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime]] to rewrite
* `CAST(time AS TIMESTAMP_LTZ)` with a query-stable current date; it is not registered as a SQL
* function.
*/
case class MakeTimestampLTZ(
left: Expression,
right: Expression,
timeZoneId: Option[String] = None)
extends BinaryExpression
with RuntimeReplaceable
with ExpectsInputTypes
with TimeZoneAwareExpression {

override lazy val replacement: Expression = StaticInvoke(
classOf[DateTimeUtils.type],
TimestampType,
"makeTimestamp",
Seq(left, right, Literal.create(zoneId.getId, StringType)),
Seq(left.dataType, right.dataType, StringType)
)

override def inputTypes: Seq[AbstractDataType] = Seq(DateType, AnyTimeType)

override def prettyName: String = "make_timestamp_ltz"

// TimeZoneAwareExpression's `final override val nodePatterns` wins in linearization and would
// otherwise drop RUNTIME_REPLACEABLE; re-add it via this hook, mirroring the other
// RuntimeReplaceable + TimeZoneAwareExpression siblings (e.g. ParseToDate, ParseToTimestamp).
override def nodePatternsInternal(): Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression = {
copy(left = newLeft, right = newRight)
}
}

/**
* Creates a nanosecond-precision `TIMESTAMP_LTZ(precision)` (precision in [7, 9]) from a date and a
* local time interpreted in the session time zone, preserving the time's sub-microsecond digits up
* to `precision`. This is the nanosecond, time-zone aware counterpart of [[MakeTimestampLTZ]]. It
* is an internal expression used by [[org.apache.spark.sql.catalyst.optimizer.ComputeCurrentTime]]
* to rewrite `CAST(time AS TIMESTAMP_LTZ(precision))` with a query-stable current date; it is not
* registered as a SQL function.
*/
case class MakeTimestampLTZNanos(
left: Expression,
right: Expression,
precision: Int,
timeZoneId: Option[String] = None)
extends BinaryExpression
with RuntimeReplaceable
with ExpectsInputTypes
with TimeZoneAwareExpression {

override lazy val replacement: Expression = StaticInvoke(
classOf[DateTimeUtils.type],
TimestampLTZNanosType(precision),
"makeTimestampLTZNanos",
Seq(left, right, Literal(precision), Literal.create(zoneId.getId, StringType)),
Seq(left.dataType, right.dataType, IntegerType, StringType)
)

override def inputTypes: Seq[AbstractDataType] = Seq(DateType, AnyTimeType)

override def prettyName: String = "make_timestamp_ltz_nanos"

// See MakeTimestampLTZ: re-add RUNTIME_REPLACEABLE that TimeZoneAwareExpression's final
// nodePatterns would otherwise drop in linearization.
override def nodePatternsInternal(): Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): Expression = {
copy(left = newLeft, right = newRight)
}
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal]
val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal]

// The CAST bit is included so this rule can find TIME -> TIMESTAMP_NTZ casts (which depend on
// CURRENT_DATE) and stabilize them below. CAST is a broad pattern, so this widens the rule's
// traversal to most plans; the precise `Cast.isTimeToTimestampNTZ` guard keeps the rewrite
// The CAST bit is included so this rule can find TIME -> TIMESTAMP_NTZ and TIME ->
// TIMESTAMP_LTZ casts (which derive their date fields from CURRENT_DATE) and stabilize them
// below. CAST is a broad pattern, so this widens the rule's traversal to most plans; the
// precise `Cast.isTimeToTimestampNTZ` / `Cast.isTimeToTimestampLTZ` guards keep the rewrite
// scoped. We intentionally do not tag these casts with CURRENT_LIKE instead: inline-table
// validation treats CURRENT_LIKE as safe to defer, so tagging would let unrelated non-foldable
// NTZ-target casts (e.g. CAST(rand() AS TIMESTAMP_NTZ)) bypass that validation (see SPARK-57618
// and ResolveInlineTablesSuite).
// NTZ/LTZ-target casts (e.g. CAST(rand() AS TIMESTAMP_NTZ)) bypass that validation (see
// SPARK-57618 and ResolveInlineTablesSuite).
def transformCondition(treePatternbits: TreePatternBits): Boolean = {
treePatternbits.containsPattern(CURRENT_LIKE) || treePatternbits.containsPattern(CAST)
}
Expand Down Expand Up @@ -160,6 +161,25 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
throw SparkException.internalError(
s"Unexpected target type in TIME -> TIMESTAMP_NTZ rewrite: $other")
}
// CAST(time AS TIMESTAMP_LTZ(q)) likewise fills the date fields from CURRENT_DATE.
// Rewrite it to a zone-aware date+time builder anchored on the same query-stable current
// date literal, so all references agree within the query.
case c: Cast if Cast.isTimeToTimestampLTZ(c.child.dataType, c.dataType) =>
val dateLit = currentDates.getOrElseUpdate(c.zoneId, {
Literal.create(
DateTimeUtils.microsToDays(currentTimestampMicros, c.zoneId), DateType)
})
c.dataType match {
case l: TimestampLTZNanosType =>
MakeTimestampLTZNanos(dateLit, c.child, l.precision, c.timeZoneId).replacement
case _: TimestampType =>
MakeTimestampLTZ(dateLit, c.child, c.timeZoneId).replacement
case other =>
// Unreachable: the outer guard `Cast.isTimeToTimestampLTZ` only matches the micro
// TimestampType and the nanosecond TimestampLTZNanosType targets.
throw SparkException.internalError(
s"Unexpected target type in TIME -> TIMESTAMP_LTZ rewrite: $other")
}
case currentTimeType : CurrentTime =>
val truncatedTime = truncateTimeToPrecision(currentTimeOfDayNanos,
currentTimeType.precision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,42 @@ object DateTimeUtils extends SparkDateTimeUtils {
makeTimestamp(days, nanos, zoneId)
}

/**
* Makes a nanosecond-precision `TIMESTAMP_LTZ(precision)` (precision in [7, 9]) from a date and
* a local time interpreted in the time zone `zoneId`, preserving the time's sub-microsecond
* digits up to `precision`. This is the nanosecond counterpart of
* [[makeTimestamp(days:Int,nanos:Long,zoneId:java\.time\.ZoneId)*]].
*
* @param days The number of days since the epoch 1970-01-01.
* Negative numbers represent earlier days.
* @param nanos The number of nanoseconds within the day since midnight.
* @param precision The fractional-second precision of the target `TIMESTAMP_LTZ(precision)`.
* @param zoneId The time zone ID at which the operation is performed.
* @return The composite `(epochMicros, nanosWithinMicro)` pair since the epoch
* 1970-01-01 00:00:00Z.
*/
def makeTimestampLTZNanos(
days: Int,
nanos: Long,
precision: Int,
zoneId: ZoneId): TimestampNanosVal = {
val ldt = LocalDateTime.of(daysToLocalDate(days), nanosToLocalTime(nanos))
instantToTimestampNanos(ldt.atZone(zoneId).toInstant, precision)
}

/**
* Makes a nanosecond-precision `TIMESTAMP_LTZ(precision)` from a date and a local time with a
* time zone string. Used by the `CAST(time AS TIMESTAMP_LTZ(precision))` rewrite, which embeds
* the resolved session time zone as a string literal.
*/
def makeTimestampLTZNanos(
days: Int,
nanos: Long,
precision: Int,
timezone: UTF8String): TimestampNanosVal = {
makeTimestampLTZNanos(days, nanos, precision, getZoneId(timezone.toString))
}

/**
* Adds a day-time interval to a time.
*
Expand Down
Loading