Skip to content

Commit d248d4c

Browse files
committed
address comments
1 parent 7c1dcc3 commit d248d4c

File tree

8 files changed

+199
-41
lines changed

8 files changed

+199
-41
lines changed

docs/sql-programming-guide.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1805,12 +1805,13 @@ working with timestamps in `pandas_udf`s to get the best performance, see
18051805

18061806
- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
18071807
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
1808-
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
1809-
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
1810-
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
1811-
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
1812-
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
1813-
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
1808+
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
1809+
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
1810+
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
1811+
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
1812+
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
1813+
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
1814+
- Since Spark 2.4, `to_utc_timestamp` and `from_utc_timestamp` return null if the input timestamp string has a timezone part, e.g. `2000-10-10 00:00:00+00:00`. To set `false` to `spark.sql.function.rejectTimezoneInString` restores the previous behavior. This option will be removed in Spark 3.0.
18141815
## Upgrading From Spark SQL 2.2 to 2.3
18151816

18161817
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ object TypeCoercion {
5959
IfCoercion ::
6060
StackCoercion ::
6161
Division ::
62-
ImplicitTypeCasts ::
62+
new ImplicitTypeCasts(conf) ::
6363
DateTimeOperations ::
6464
WindowFrameCoercion ::
6565
Nil
@@ -776,27 +776,32 @@ object TypeCoercion {
776776
/**
777777
* Casts types according to the expected input types for [[Expression]]s.
778778
*/
779-
object ImplicitTypeCasts extends TypeCoercionRule {
779+
class ImplicitTypeCasts(conf: SQLConf) extends TypeCoercionRule {
780+
781+
private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)
782+
780783
override protected def coerceTypes(
781784
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
782785
// Skip nodes who's children have not been resolved yet.
783786
case e if !e.childrenResolved => e
784787

785-
// Special rules for `to/from_utc_timestamp`. `to/from_utc_timestamp` assumes its input is
786-
// in UTC timezone, and if input is string, it should not contain timezone.
788+
// Special rules for `from/to_utc_timestamp`. These 2 functions assume the input timestamp
789+
// string is in a specific timezone, so the string itself should not contain timezone.
787790
// TODO: We should move the type coercion logic to expressions instead of a central
788791
// place to put all the rules.
789792
case e: FromUTCTimestamp if e.left.dataType == StringType =>
790-
e.copy(left = StringToTimestampWithoutTimezone(e.left))
791-
792-
case e: FromUTCTimestamp if e.left.dataType == DateType =>
793-
e.copy(left = Cast(e.left, TimestampType))
793+
if (rejectTzInString) {
794+
e.copy(left = StringToTimestampWithoutTimezone(e.left))
795+
} else {
796+
e.copy(left = Cast(e.left, TimestampType))
797+
}
794798

795799
case e: ToUTCTimestamp if e.left.dataType == StringType =>
796-
e.copy(left = StringToTimestampWithoutTimezone(e.left))
797-
798-
case e: ToUTCTimestamp if e.left.dataType == DateType =>
799-
e.copy(left = Cast(e.left, TimestampType))
800+
if (rejectTzInString) {
801+
e.copy(left = StringToTimestampWithoutTimezone(e.left))
802+
} else {
803+
e.copy(left = Cast(e.left, TimestampType))
804+
}
800805

801806
case b @ BinaryOperator(left, right) if left.dataType != right.dataType =>
802807
findTightestCommonType(left.dataType, right.dataType).map { commonType =>
@@ -814,7 +819,7 @@ object TypeCoercion {
814819
case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty =>
815820
val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
816821
// If we cannot do the implicit cast, just use the original input.
817-
implicitCast(in, expected).getOrElse(in)
822+
ImplicitTypeCasts.implicitCast(in, expected).getOrElse(in)
818823
}
819824
e.withNewChildren(children)
820825

@@ -830,6 +835,9 @@ object TypeCoercion {
830835
}
831836
e.withNewChildren(children)
832837
}
838+
}
839+
840+
object ImplicitTypeCasts {
833841

834842
/**
835843
* Given an expected data type, try to cast the expression and return the cast expression.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,11 +1029,12 @@ case class StringToTimestampWithoutTimezone(child: Expression, timeZoneId: Optio
10291029
override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
10301030
override def dataType: DataType = TimestampType
10311031
override def nullable: Boolean = true
1032-
override def prettyName: String = "string_to_timestamp"
1032+
override def toString: String = child.toString
1033+
override def sql: String = child.sql
10331034

10341035
override def nullSafeEval(input: Any): Any = {
10351036
DateTimeUtils.stringToTimestamp(
1036-
input.asInstanceOf[UTF8String], timeZone, forceTimezone = true).orNull
1037+
input.asInstanceOf[UTF8String], timeZone, rejectTzInString = true).orNull
10371038
}
10381039

10391040
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
@@ -1073,7 +1074,7 @@ case class StringToTimestampWithoutTimezone(child: Expression, timeZoneId: Optio
10731074
since = "1.5.0")
10741075
// scalastyle:on line.size.limit
10751076
case class FromUTCTimestamp(left: Expression, right: Expression)
1076-
extends BinaryExpression with ExpectsInputTypes {
1077+
extends BinaryExpression with ImplicitCastInputTypes {
10771078

10781079
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
10791080
override def dataType: DataType = TimestampType
@@ -1262,7 +1263,7 @@ case class MonthsBetween(
12621263
since = "1.5.0")
12631264
// scalastyle:on line.size.limit
12641265
case class ToUTCTimestamp(left: Expression, right: Expression)
1265-
extends BinaryExpression with ExpectsInputTypes {
1266+
extends BinaryExpression with ImplicitCastInputTypes {
12661267

12671268
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
12681269
override def dataType: DataType = TimestampType

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,11 +296,11 @@ object DateTimeUtils {
296296
* `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m`
297297
*/
298298
def stringToTimestamp(s: UTF8String): Option[SQLTimestamp] = {
299-
stringToTimestamp(s, defaultTimeZone(), forceTimezone = false)
299+
stringToTimestamp(s, defaultTimeZone(), rejectTzInString = false)
300300
}
301301

302302
def stringToTimestamp(s: UTF8String, timeZone: TimeZone): Option[SQLTimestamp] = {
303-
stringToTimestamp(s, timeZone, forceTimezone = false)
303+
stringToTimestamp(s, timeZone, rejectTzInString = false)
304304
}
305305

306306
/**
@@ -310,13 +310,14 @@ object DateTimeUtils {
310310
* @param s the input timestamp string.
311311
* @param timeZone the timezone of the timestamp string, will be ignored if the timestamp string
312312
* already contains timezone information and `forceTimezone` is false.
313-
* @param forceTimezone if true, force to apply the given timezone to the timestamp string. If the
314-
* timestamp string already contains timezone, return None.
313+
* @param rejectTzInString if true, rejects timezone in the input string, i.e., if the
314+
* timestamp string contains timezone, like `2000-10-10 00:00:00+00:00`,
315+
* return None.
315316
*/
316317
def stringToTimestamp(
317318
s: UTF8String,
318319
timeZone: TimeZone,
319-
forceTimezone: Boolean): Option[SQLTimestamp] = {
320+
rejectTzInString: Boolean): Option[SQLTimestamp] = {
320321
if (s == null) {
321322
return None
322323
}
@@ -434,7 +435,7 @@ object DateTimeUtils {
434435
return None
435436
}
436437

437-
if (tz.isDefined && forceTimezone) return None
438+
if (tz.isDefined && rejectTzInString) return None
438439

439440
val c = if (tz.isEmpty) {
440441
Calendar.getInstance(timeZone)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,13 @@ object SQLConf {
12081208
.stringConf
12091209
.createWithDefault("")
12101210

1211+
val REJECT_TIMEZONE_IN_STRING = buildConf("spark.sql.function.rejectTimezoneInString")
1212+
.internal()
1213+
.doc("If true, `to_utc_timestamp` and `from_utc_timestamp` return null if the input string " +
1214+
"contains a timezone part, e.g. `2000-10-10 00:00:00+00:00`.")
1215+
.booleanConf
1216+
.createWithDefault(true)
1217+
12111218
object PartitionOverwriteMode extends Enumeration {
12121219
val STATIC, DYNAMIC = Value
12131220
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -524,23 +524,23 @@ class TypeCoercionSuite extends AnalysisTest {
524524
test("cast NullType for expressions that implement ExpectsInputTypes") {
525525
import TypeCoercionSuite._
526526

527-
ruleTest(TypeCoercion.ImplicitTypeCasts,
527+
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
528528
AnyTypeUnaryExpression(Literal.create(null, NullType)),
529529
AnyTypeUnaryExpression(Literal.create(null, NullType)))
530530

531-
ruleTest(TypeCoercion.ImplicitTypeCasts,
531+
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
532532
NumericTypeUnaryExpression(Literal.create(null, NullType)),
533533
NumericTypeUnaryExpression(Literal.create(null, DoubleType)))
534534
}
535535

536536
test("cast NullType for binary operators") {
537537
import TypeCoercionSuite._
538538

539-
ruleTest(TypeCoercion.ImplicitTypeCasts,
539+
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
540540
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
541541
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)))
542542

543-
ruleTest(TypeCoercion.ImplicitTypeCasts,
543+
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
544544
NumericTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
545545
NumericTypeBinaryOperator(Literal.create(null, DoubleType), Literal.create(null, DoubleType)))
546546
}
@@ -823,7 +823,7 @@ class TypeCoercionSuite extends AnalysisTest {
823823
}
824824

825825
test("type coercion for CaseKeyWhen") {
826-
ruleTest(TypeCoercion.ImplicitTypeCasts,
826+
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
827827
CaseKeyWhen(Literal(1.toShort), Seq(Literal(1), Literal("a"))),
828828
CaseKeyWhen(Cast(Literal(1.toShort), IntegerType), Seq(Literal(1), Literal("a")))
829829
)
@@ -1275,7 +1275,7 @@ class TypeCoercionSuite extends AnalysisTest {
12751275
}
12761276

12771277
test("SPARK-17117 null type coercion in divide") {
1278-
val rules = Seq(FunctionArgumentConversion, Division, ImplicitTypeCasts)
1278+
val rules = Seq(FunctionArgumentConversion, Division, new ImplicitTypeCasts(conf))
12791279
val nullLit = Literal.create(null, NullType)
12801280
ruleTest(rules, Divide(1L, nullLit), Divide(Cast(1L, DoubleType), Cast(nullLit, DoubleType)))
12811281
ruleTest(rules, Divide(nullLit, 1L), Divide(Cast(nullLit, DoubleType), Cast(1L, DoubleType)))

sql/core/src/test/resources/sql-tests/inputs/datetime.sql

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,34 @@ select a, b from ttf2 order by a, current_date;
2828

2929
select weekday('2007-02-03'), weekday('2009-07-30'), weekday('2017-05-27'), weekday(null), weekday('1582-10-15 13:10:15');
3030

31+
select from_utc_timestamp('2015-07-24 00:00:00', 'PST');
32+
33+
select from_utc_timestamp('2015-01-24 00:00:00', 'PST');
34+
35+
select from_utc_timestamp(null, 'PST');
36+
37+
select from_utc_timestamp('2015-07-24 00:00:00', null);
38+
39+
select from_utc_timestamp(null, null);
40+
41+
select from_utc_timestamp(cast(0 as timestamp), 'PST');
42+
43+
select from_utc_timestamp(cast('2015-01-24' as date), 'PST');
44+
45+
select to_utc_timestamp('2015-07-24 00:00:00', 'PST');
46+
47+
select to_utc_timestamp('2015-01-24 00:00:00', 'PST');
48+
49+
select to_utc_timestamp(null, 'PST');
50+
51+
select to_utc_timestamp('2015-07-24 00:00:00', null);
52+
53+
select to_utc_timestamp(null, null);
54+
55+
select to_utc_timestamp(cast(0 as timestamp), 'PST');
56+
57+
select to_utc_timestamp(cast('2015-01-24' as date), 'PST');
58+
3159
-- SPARK-23715: the input of to/from_utc_timestamp can not have timezone
3260
select from_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');
3361

0 commit comments

Comments
 (0)