Skip to content

Commit 6f593be

Browse files
Hisoka-Xyaooqinn
authored andcommitted
[SPARK-43267][JDBC] Handle postgres unknown user-defined column as string in array
### What changes were proposed in this pull request? Spark SQL now doesn’t support creating data frame from a Postgres table that contains user-defined array column. This PR support it as string. ### Why are the changes needed? Support handle user-defined array column in SPARK SQL with Postgres ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? 1. Add new test. 2. Tested in local. ```sql CREATE DOMAIN not_null_text AS TEXT DEFAULT ''; create table films ( code char(5 char) not null constraint firstkey primary key, title varchar(40 char) not null, did bigint not null, date_prod date, kind varchar(10 char), tz timestamp with time zone, int_arr integer[], column_name not_null_text[], column_name2 not_null_text ); INSERT INTO public.films (code, title, did, date_prod, kind, tz, int_arr, column_name, column_name2) VALUES (e'2 ', 'fdas', 1, '2023-04-07 16:05:48', '2', null, null, null, null); INSERT INTO public.films (code, title, did, date_prod, kind, tz, int_arr, column_name, column_name2) VALUES (e'4 ', 'fdsa', 1, '2023-04-07 16:05:48', '4', null, null, null, null); INSERT INTO public.films (code, title, did, date_prod, kind, tz, int_arr, column_name, column_name2) VALUES ('1 ', 'dafsdf', 1, '2023-04-04 14:43:51', '1', '2023-04-25 18:53:17.467000 +00:00', '{1,2,3}', '{1,fds,fdsa}', 'fdasfasdf'); ``` Test Case ```scala test("jdbc array") { val connectionProperties = new Properties() connectionProperties.put("user", "system") connectionProperties.put("password", "system") spark.read.jdbc( url = "jdbc:postgresql://localhost:54321/test?useSSL=false&serverTimezone=UTC" + "&useUnicode=true&characterEncoding=utf-8", table = "TEST.public.films", connectionProperties ).show() } ``` Result <img width="1444" alt="image" src="https://user-images.githubusercontent.com/32387433/234458027-e67e410b-c417-400d-be7e-431768afc0ef.png"> Closes #40953 from Hisoka-X/SPARK-43267_pg_array. Lead-authored-by: Jia Fan <[email protected]> Co-authored-by: Hisoka <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent f2f6272 commit 6f593be

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
147147
|('2013-04-05 12:01:02'),
148148
|('2013-04-05 18:01:02.123'),
149149
|('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()
150+
151+
conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate()
152+
conn.prepareStatement("create table custom_type(type_array not_null_text[]," +
153+
"type not_null_text)").executeUpdate()
154+
conn.prepareStatement("INSERT INTO custom_type (type_array, type) VALUES" +
155+
"('{1,fds,fdsa}','fdasfasdf')").executeUpdate()
156+
150157
}
151158

152159
test("Type mapping for various types") {
@@ -416,4 +423,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
416423
val df_actual = sqlContext.read.jdbc(jdbcUrl, "timestamp_ntz_roundtrip", prop)
417424
assert(df_actual.collect()(0) == df_expected.collect()(0))
418425
}
426+
427+
test("SPARK-43267: user-defined column in array test") {
428+
val df = sqlContext.read.jdbc(jdbcUrl, "custom_type", new Properties)
429+
val row = df.collect()
430+
assert(row.length === 1)
431+
assert(row(0).length === 2)
432+
assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa"))
433+
assert(row(0).getString(1) == "fdasfasdf")
434+
}
419435
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,15 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
9494
case "numeric" | "decimal" if precision > 0 => Some(DecimalType.bounded(precision, scale))
9595
case "numeric" | "decimal" =>
9696
// SPARK-26538: handle numeric without explicit precision and scale.
97-
Some(DecimalType. SYSTEM_DEFAULT)
97+
Some(DecimalType.SYSTEM_DEFAULT)
9898
case "money" =>
9999
// money[] type seems to be broken and difficult to handle.
100100
// So this method returns None for now.
101101
// See SPARK-34333 and https://github.com/pgjdbc/pgjdbc/issues/1405
102102
None
103-
case _ => None
103+
case _ =>
104+
// SPARK-43267: handle unknown types in array as string, because there are user-defined types
105+
Some(StringType)
104106
}
105107

106108
override def convertJavaTimestampToTimestampNTZ(t: Timestamp): LocalDateTime = {

0 commit comments

Comments
 (0)