From 40decc66e11c85024f542513f153c0ae74af4793 Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Tue, 11 Jun 2019 18:16:24 -0700 Subject: [PATCH 1/3] Map ByteType to SMALLINT PostgreSQL doesn't have TINYINT, which would map directly, but SMALLINTs are sufficient for uni-directional translation. --- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 5be45c973a5f..e7b9dea7be35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -74,13 +74,13 @@ private object PostgresDialect extends JdbcDialect { case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT)) + case ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT)) case t: DecimalType => Some( JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => getJDBCType(et).map(_.databaseTypeDefinition) .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition)) .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY)) - case ByteType => throw new IllegalArgumentException(s"Unsupported type in postgresql: $dt"); case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5f27e75addcf..0523528e98da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -857,10 +857,7 @@ class JDBCSuite extends QueryTest Some(ArrayType(DecimalType.SYSTEM_DEFAULT))) assert(Postgres.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT4") assert(Postgres.getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "FLOAT8") - val errMsg = intercept[IllegalArgumentException] { - Postgres.getJDBCType(ByteType) - } - assert(errMsg.getMessage contains "Unsupported type in postgresql: ByteType") + assert(Postgres.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT") } test("DerbyDialect jdbc type mapping") { From 50cb99a6983aa810c308f957f1f4d5eea76c81de Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Tue, 18 Jun 2019 13:01:53 -0700 Subject: [PATCH 2/3] Merge conditions --- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index e7b9dea7be35..2645e4c9d528 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -73,8 +73,7 @@ private object PostgresDialect extends JdbcDialect { case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE)) - case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT)) - case ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT)) + case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT)) case t: DecimalType => Some( JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC)) case ArrayType(et, _) if et.isInstanceOf[AtomicType] => From 67713ee86607db7356fd3bf6adee1f1579e39c6f Mon Sep 17 00:00:00 2001 From: Seth Fitzsimmons Date: Wed, 17 Jul 2019 12:52:01 -0700 Subject: [PATCH 3/3] Add integration test --- .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 7caf3d6ba59f..364ddddc2f80 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -206,4 +206,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { """.stripMargin.replaceAll("\n", " ")) assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult) } + + test("write byte as smallint") { + sqlContext.createDataFrame(Seq((1.toByte, 2.toShort))) + .write.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties) + val df = sqlContext.read.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties) + val schema = df.schema + assert(schema.head.dataType == ShortType) + assert(schema(1).dataType == ShortType) + val rows = df.collect() + assert(rows.length === 1) + assert(rows(0).getShort(0) === 1) + assert(rows(0).getShort(1) === 2) + } }