diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/UpperTransform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/UpperTransform.java new file mode 100644 index 000000000000..ffdca627c116 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/UpperTransform.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; + +import java.util.List; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Upper {@link Transform}. */ +public class UpperTransform extends StringTransform { + + private static final long serialVersionUID = 1L; + + public UpperTransform(List inputs) { + super(inputs); + checkArgument(inputs.size() == 1); + } + + @Override + public BinaryString transform(List inputs) { + BinaryString string = inputs.get(0); + if (string == null) { + return null; + } + return string.toUpperCase(); + } + + @Override + public Transform copyWithNewInputs(List inputs) { + return new UpperTransform(inputs); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/UpperTransformTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/UpperTransformTest.java new file mode 100644 index 000000000000..27436d289aa1 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/UpperTransformTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class UpperTransformTest { + + @Test + public void testNullInputs() { + List inputs = new ArrayList<>(); + inputs.add(null); + UpperTransform transform = new UpperTransform(inputs); + Object result = transform.transform(GenericRow.of()); + assertThat(result).isNull(); + } + + @Test + public void testNormalInputs() { + List inputs = new ArrayList<>(); + inputs.add(BinaryString.fromString("hello")); + UpperTransform transform = new UpperTransform(inputs); + Object result = transform.transform(GenericRow.of()); + assertThat(result).isEqualTo(BinaryString.fromString("HELLO")); + } + + @Test + public void testIllegalInputs() { + List inputs = new ArrayList<>(); + inputs.add(BinaryString.fromString("hello")); + inputs.add(BinaryString.fromString("hi")); + assertThatThrownBy(() -> new UpperTransform(inputs)) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala index b32436cabab5..71bbea254c92 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkExpressionConverter.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.util import org.apache.paimon.data.{BinaryString, Decimal, Timestamp} -import org.apache.paimon.predicate.{ConcatTransform, FieldRef, FieldTransform, Transform} +import org.apache.paimon.predicate.{ConcatTransform, FieldRef, FieldTransform, Transform, UpperTransform} import org.apache.paimon.spark.SparkTypeUtils import org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType import org.apache.paimon.types.{DecimalType, RowType} @@ -34,6 +34,7 @@ object SparkExpressionConverter { // Supported transform names private val CONCAT = "CONCAT" + private val UPPER = "UPPER" /** Convert Spark [[Expression]] to Paimon [[Transform]], return None if not supported. */ def toPaimonTransform(exp: Expression, rowType: RowType): Option[Transform] = { @@ -48,6 +49,13 @@ object SparkExpressionConverter { case _ => return None } Some(new ConcatTransform(inputs.toList.asJava)) + case UPPER => + val inputs = exp.children().map { + case n: NamedReference => toPaimonFieldRef(n, rowType) + case l: Literal[_] => toPaimonLiteral(l) + case _ => return None + } + Some(new UpperTransform(inputs.toList.asJava)) case _ => None } case _ => None diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala index 5a4ff36b6c82..be2bf18bef7b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala @@ -129,6 +129,36 @@ abstract class PaimonPushDownTestBase extends PaimonSparkTestBase { } } + test(s"Paimon push down: apply UPPER") { + // Spark support push down UPPER since Spark 3.4. + if (gteqSpark3_4) { + withTable("t") { + sql(""" + |CREATE TABLE t (id int, value int, dt STRING) + |using paimon + |PARTITIONED BY (dt) + |""".stripMargin) + + sql(""" + |INSERT INTO t values + |(1, 100, 'hello') + |""".stripMargin) + + val q = + """ + |SELECT * FROM t + |WHERE UPPER(dt) = 'HELLO' + |""".stripMargin + assert(!checkFilterExists(q)) + + checkAnswer( + spark.sql(q), + Seq(Row(1, 100, "hello")) + ) + } + } + } + test("Paimon pushDown: limit for append-only tables with deletion vector") { withTable("dv_test") { spark.sql(