From bc17a9e97a7175e91e82280f0cc0d383e9e94b3e Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Tue, 31 May 2022 21:05:44 +0200 Subject: [PATCH 1/7] udt support with mllib example :) --- examples/pom-3.2_2.12.xml | 5 ++ .../kotlinx/spark/examples/Correlation.kt | 64 +++++++++++++++++++ .../jetbrains/kotlinx/spark/api/Encoding.kt | 16 +++++ 3 files changed, 85 insertions(+) create mode 100644 examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt diff --git a/examples/pom-3.2_2.12.xml b/examples/pom-3.2_2.12.xml index 181a2f73..92fba9f2 100644 --- a/examples/pom-3.2_2.12.xml +++ b/examples/pom-3.2_2.12.xml @@ -29,6 +29,11 @@ spark-streaming_${scala.compat.version} ${spark3.version} + + org.apache.spark + spark-mllib_${scala.compat.version} + ${spark3.version} + org.apache.spark spark-streaming-kafka-0-10_${scala.compat.version} diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt new file mode 100644 index 00000000..fb1367e9 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt @@ -0,0 +1,64 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples + +import org.apache.spark.ml.linalg.Matrix +import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.stat.Correlation +import org.apache.spark.sql.Row +import org.apache.spark.sql.RowFactory +import org.apache.spark.sql.types.Metadata +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType +import org.jetbrains.kotlinx.spark.api.showDS +import org.jetbrains.kotlinx.spark.api.tuples.* +import org.jetbrains.kotlinx.spark.api.withSpark + + +fun main() = withSpark { + val data = listOf( + Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(1.0, -2.0)), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(6.0, 7.0, 0.0, 8.0), + Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(9.0, 1.0)) + ).map(::tupleOf) + + val df = data.toDS() + df.showDS() + + val r1: Matrix = Correlation.corr(df, "_1").head().getAs(0) + println( + """ + |Pearson correlation matrix: + |$r1 + | + """.trimMargin() + ) + + val r2: Matrix = Correlation.corr(df, "_1", "spearman").head().getAs(0) + println( + """ + |Spearman correlation matrix: + |$r2 + | + """.trimMargin() + ) +} \ No newline at end of file diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt index eafd460f..882b7192 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt @@ -72,6 +72,7 @@ import kotlin.reflect.* import kotlin.reflect.full.findAnnotation import kotlin.reflect.full.isSubclassOf import kotlin.reflect.full.primaryConstructor +import kotlin.reflect.jvm.jvmName import kotlin.to @JvmField @@ -308,6 +309,21 @@ fun schema(type: KType, map: Map = mapOf()): DataType { ) } + UDTRegistration.exists(klass.jvmName) -> { + @Suppress("UNCHECKED_CAST") + val dataType = UDTRegistration.getUDTFor(klass.jvmName) + .getOrNull()!! + .let { it as Class> } + .getConstructor() + .newInstance() + + KSimpleTypeWrapper( + /* dt = */ dataType, + /* cls = */ klass.java, + /* nullable = */ false + ) + } + else -> throw IllegalArgumentException("$type is unsupported") } } From 63d1368ff78f78d8a2be6ca0ee8832e5990c904f Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Tue, 31 May 2022 21:39:28 +0200 Subject: [PATCH 2/7] udt registration also works --- .../kotlinx/spark/examples/Correlation.kt | 1 - .../kotlinx/spark/examples/UdtRegistration.kt | 73 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt index fb1367e9..aa3b8b94 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt @@ -42,7 +42,6 @@ fun main() = withSpark { ).map(::tupleOf) val df = data.toDS() - df.showDS() val r1: Matrix = Correlation.corr(df, "_1").head().getAs(0) println( diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt new file mode 100644 index 00000000..e3d98dc6 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt @@ -0,0 +1,73 @@ +import org.apache.hadoop.shaded.com.google.common.base.MoreObjects +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.* +import org.apache.spark.unsafe.types.UTF8String +import org.jetbrains.kotlinx.spark.api.* +import org.jetbrains.kotlinx.spark.api.tuples.tupleOf +import java.io.Serializable +import kotlin.reflect.jvm.jvmName + +class CityUserDefinedType : UserDefinedType() { + + override fun sqlType(): DataType = DATA_TYPE + + override fun serialize(city: City): InternalRow = GenericInternalRow(2).apply { + setInt(DEPT_NUMBER_INDEX, city.departmentNumber) + update(NAME_INDEX, UTF8String.fromString(city.name)) + } + + override fun deserialize(datum: Any): City = + if (datum is InternalRow) + City( + name = datum.getString(NAME_INDEX), + departmentNumber = datum.getInt(DEPT_NUMBER_INDEX), + ) + else throw IllegalStateException("Unsupported conversion") + + override fun userClass(): Class = City::class.java + + companion object { + private const val DEPT_NUMBER_INDEX = 0 + private const val NAME_INDEX = 1 + private val DATA_TYPE = DataTypes.createStructType( + arrayOf( + DataTypes.createStructField( + "departmentNumber", + DataTypes.IntegerType, + false, + MetadataBuilder().putLong("maxNumber", 99).build(), + ), + DataTypes.createStructField("name", DataTypes.StringType, false) + ) + ) + } +} + +@SQLUserDefinedType(udt = CityUserDefinedType::class) +class City( + val name: String, + val departmentNumber: Int, +) : Serializable { + + override fun toString(): String = + MoreObjects + .toStringHelper(this) + .add("name", name) + .add("departmentNumber", departmentNumber) + .toString() +} + +fun main() = withSpark { + + UDTRegistration.register(City::class.jvmName, CityUserDefinedType::class.jvmName) + + val items = listOf( + City("Amsterdam", 1), + City("Breda", 2), + City("Oosterhout", 3), + ).map(::tupleOf) + + val ds = items.toDS() + ds.showDS() +} \ No newline at end of file From 29de611a5d4f73f92d006da35a2a6eea83140903 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Wed, 1 Jun 2022 14:02:15 +0200 Subject: [PATCH 3/7] added tests and support for @SQLUserDefinedType --- .../kotlinx/spark/examples/UdtRegistration.kt | 7 +- kotlin-spark-api/3.2/pom_2.12.xml | 6 + .../jetbrains/kotlinx/spark/api/Encoding.kt | 17 ++- .../jetbrains/kotlinx/spark/api/UdtTest.kt | 125 ++++++++++++++++++ 4 files changed, 149 insertions(+), 6 deletions(-) create mode 100644 kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt index e3d98dc6..6abd5638 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt @@ -45,10 +45,7 @@ class CityUserDefinedType : UserDefinedType() { } @SQLUserDefinedType(udt = CityUserDefinedType::class) -class City( - val name: String, - val departmentNumber: Int, -) : Serializable { +class City(val name: String, val departmentNumber: Int) : Serializable { override fun toString(): String = MoreObjects @@ -60,7 +57,7 @@ class City( fun main() = withSpark { - UDTRegistration.register(City::class.jvmName, CityUserDefinedType::class.jvmName) +// UDTRegistration.register(City::class.jvmName, CityUserDefinedType::class.jvmName) val items = listOf( City("Amsterdam", 1), diff --git a/kotlin-spark-api/3.2/pom_2.12.xml b/kotlin-spark-api/3.2/pom_2.12.xml index 22721fa4..6bd490ab 100644 --- a/kotlin-spark-api/3.2/pom_2.12.xml +++ b/kotlin-spark-api/3.2/pom_2.12.xml @@ -60,6 +60,12 @@ ${spark3.version} test + + org.apache.spark + spark-mllib_${scala.compat.version} + ${spark3.version} + test + io.kotest kotest-runner-junit5-jvm diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt index 882b7192..8f2e2159 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt @@ -70,6 +70,7 @@ import kotlin.String import kotlin.Suppress import kotlin.reflect.* import kotlin.reflect.full.findAnnotation +import kotlin.reflect.full.hasAnnotation import kotlin.reflect.full.isSubclassOf import kotlin.reflect.full.primaryConstructor import kotlin.reflect.jvm.jvmName @@ -320,7 +321,21 @@ fun schema(type: KType, map: Map = mapOf()): DataType { KSimpleTypeWrapper( /* dt = */ dataType, /* cls = */ klass.java, - /* nullable = */ false + /* nullable = */ type.isMarkedNullable + ) + } + + klass.hasAnnotation() -> { + val dataType = klass.findAnnotation()!! + .udt + .java + .getConstructor() + .newInstance() + + KSimpleTypeWrapper( + /* dt = */ dataType, + /* cls = */ klass.java, + /* nullable = */ type.isMarkedNullable ) } diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt new file mode 100644 index 00000000..e120ffe7 --- /dev/null +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt @@ -0,0 +1,125 @@ +package org.jetbrains.kotlinx.spark.api + +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import org.apache.hadoop.shaded.com.google.common.base.MoreObjects +import org.apache.spark.ml.linalg.* +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types.* +import org.apache.spark.unsafe.types.UTF8String +import org.jetbrains.kotlinx.spark.api.tuples.t +import java.io.Serializable +import kotlin.reflect.jvm.jvmName + +class UdtTest : ShouldSpec({ + context("udt") { + withSpark { + should("Recognize UDTs from libraries like MlLib") { + val input = t( + Vectors.dense(doubleArrayOf(1.0, 2.0, 3.0)), + DenseVector(doubleArrayOf(1.0, 2.0, 3.0)), + SparseVector(3, intArrayOf(0, 1, 2), doubleArrayOf(1.0, 2.0, 3.0)), + Matrices.eye(1), + DenseMatrix.eye(2), + SparseMatrix.speye(2), + ) + + val ds = dsOf(input) + + ds.collectAsList().single() shouldBe input + } + + should("Recognize locally registered UDTs with annotation") { + val input = t( + City("Amsterdam", 1), + City("Breda", 2), + City("Oosterhout", 3), + ) + + val ds = dsOf(input) + + ds.collectAsList().single() shouldBe input + } + + should("Recognize locally registered UDTs with register function") { + UDTRegistration.register(City::class.jvmName, CityUserDefinedType::class.jvmName) + + val input = t( + City("Amsterdam", 1), + City("Breda", 2), + City("Oosterhout", 3), + ) + + val ds = dsOf(input) + + ds.collectAsList().single() shouldBe input + } + } + } +}) + +class CityUserDefinedType : UserDefinedType() { + + override fun sqlType(): DataType = DATA_TYPE + + override fun serialize(city: City): InternalRow = GenericInternalRow(2).apply { + setInt(DEPT_NUMBER_INDEX, city.departmentNumber) + update(NAME_INDEX, UTF8String.fromString(city.name)) + } + + override fun deserialize(datum: Any): City = + if (datum is InternalRow) + City( + name = datum.getString(NAME_INDEX), + departmentNumber = datum.getInt(DEPT_NUMBER_INDEX), + ) + else throw IllegalStateException("Unsupported conversion") + + override fun userClass(): Class = City::class.java + + companion object { + private const val DEPT_NUMBER_INDEX = 0 + private const val NAME_INDEX = 1 + private val DATA_TYPE = DataTypes.createStructType( + arrayOf( + DataTypes.createStructField( + "departmentNumber", + DataTypes.IntegerType, + false, + MetadataBuilder().putLong("maxNumber", 99).build(), + ), + DataTypes.createStructField("name", DataTypes.StringType, false) + ) + ) + } +} + +@SQLUserDefinedType(udt = CityUserDefinedType::class) +class City(val name: String, val departmentNumber: Int) : Serializable { + + override fun toString(): String = + MoreObjects + .toStringHelper(this) + .add("name", name) + .add("departmentNumber", departmentNumber) + .toString() + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as City + + if (name != other.name) return false + if (departmentNumber != other.departmentNumber) return false + + return true + } + + override fun hashCode(): Int { + var result = name.hashCode() + result = 31 * result + departmentNumber + return result + } +} \ No newline at end of file From 449407b10e54f2b333ef5e8407bbb0215b100651 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Wed, 1 Jun 2022 16:08:24 +0200 Subject: [PATCH 4/7] added toDF and dfOf( helper functions, to further mirror the spark api. --- .../kotlinx/spark/examples/Correlation.kt | 7 +-- .../kotlinx/spark/examples/UdtRegistration.kt | 21 ++++++++- .../spark/api/jupyter/SparkIntegration.kt | 14 ++++-- .../jetbrains/kotlinx/spark/api/Dataset.kt | 47 +++++++++++++++++-- .../kotlinx/spark/api/SparkSession.kt | 18 +++++-- .../jetbrains/kotlinx/spark/api/UdtTest.kt | 21 ++++++++- 6 files changed, 112 insertions(+), 16 deletions(-) diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt index aa3b8b94..53ce3f6d 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType import org.jetbrains.kotlinx.spark.api.showDS import org.jetbrains.kotlinx.spark.api.tuples.* import org.jetbrains.kotlinx.spark.api.withSpark +import scala.Tuple1 fun main() = withSpark { @@ -41,9 +42,9 @@ fun main() = withSpark { Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(9.0, 1.0)) ).map(::tupleOf) - val df = data.toDS() + val df = data.toDF("features") - val r1: Matrix = Correlation.corr(df, "_1").head().getAs(0) + val r1 = Correlation.corr(df, "features").head().getAs(0) println( """ |Pearson correlation matrix: @@ -52,7 +53,7 @@ fun main() = withSpark { """.trimMargin() ) - val r2: Matrix = Correlation.corr(df, "_1", "spearman").head().getAs(0) + val r2 = Correlation.corr(df, "features", "spearman").head().getAs(0) println( """ |Spearman correlation matrix: diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt index 6abd5638..0c468d91 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UdtRegistration.kt @@ -1,3 +1,22 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ import org.apache.hadoop.shaded.com.google.common.base.MoreObjects import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow @@ -67,4 +86,4 @@ fun main() = withSpark { val ds = items.toDS() ds.showDS() -} \ No newline at end of file +} diff --git a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt index e817b282..9ab2b8cf 100644 --- a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt +++ b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt @@ -54,17 +54,25 @@ internal class SparkIntegration : Integration() { """ inline fun List.toDS(): Dataset = toDS(spark)""".trimIndent(), """ - inline fun Array.toDS(): Dataset = spark.dsOf(*this)""".trimIndent(), + inline fun List.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames)""".trimIndent(), + """ + inline fun Array.toDS(): Dataset = toDS(spark)""".trimIndent(), + """ + inline fun Array.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames)""".trimIndent(), """ inline fun dsOf(vararg arg: T): Dataset = spark.dsOf(*arg)""".trimIndent(), + """ + inline fun dfOf(vararg arg: T): Dataset = spark.dfOf(*arg)""".trimIndent(), + """ + inline fun dfOf(colNames: Array, vararg arg: T): Dataset = spark.dfOf(colNames, *arg)""".trimIndent(), """ inline fun RDD.toDS(): Dataset = toDS(spark)""".trimIndent(), """ inline fun JavaRDDLike.toDS(): Dataset = toDS(spark)""".trimIndent(), """ - inline fun RDD.toDF(): Dataset = toDF(spark)""".trimIndent(), + inline fun RDD.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames)""".trimIndent(), """ - inline fun JavaRDDLike.toDF(): Dataset = toDF(spark)""".trimIndent(), + inline fun JavaRDDLike.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames)""".trimIndent(), """ val udf: UDFRegistration get() = spark.udf()""".trimIndent(), ).map(::execute) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt index 71abb1ee..39a60bc8 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt @@ -51,11 +51,30 @@ import kotlin.reflect.KProperty1 inline fun SparkSession.toDS(list: List): Dataset = createDataset(list, encoder()) +/** + * Utility method to create dataframe from list + */ +inline fun SparkSession.toDF(list: List, vararg colNames: String): Dataset = + toDS(list).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } + /** * Utility method to create dataset from *array or vararg arguments */ inline fun SparkSession.dsOf(vararg t: T): Dataset = - createDataset(listOf(*t), encoder()) + createDataset(t.toList(), encoder()) + +/** + * Utility method to create dataframe from *array or vararg arguments + */ +inline fun SparkSession.dfOf(vararg t: T): Dataset = + createDataset(t.toList(), encoder()).toDF() + +/** + * Utility method to create dataframe from *array or vararg arguments with given column names + */ +inline fun SparkSession.dfOf(colNames: Array, vararg t: T): Dataset = + createDataset(t.toList(), encoder()) + .run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } /** * Utility method to create dataset from list @@ -63,6 +82,24 @@ inline fun SparkSession.dsOf(vararg t: T): Dataset = inline fun List.toDS(spark: SparkSession): Dataset = spark.createDataset(this, encoder()) +/** + * Utility method to create dataframe from list + */ +inline fun List.toDF(spark: SparkSession, vararg colNames: String): Dataset = + toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } + +/** + * Utility method to create dataset from list + */ +inline fun Array.toDS(spark: SparkSession): Dataset = + toList().toDS(spark) + +/** + * Utility method to create dataframe from list + */ +inline fun Array.toDF(spark: SparkSession, vararg colNames: String): Dataset = + toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } + /** * Utility method to create dataset from RDD */ @@ -79,15 +116,15 @@ inline fun JavaRDDLike.toDS(spark: SparkSession): Dataset = * Utility method to create Dataset (Dataframe) from JavaRDD. * NOTE: [T] must be [Serializable]. */ -inline fun JavaRDDLike.toDF(spark: SparkSession): Dataset = - toDS(spark).toDF() +inline fun JavaRDDLike.toDF(spark: SparkSession, vararg colNames: String): Dataset = + toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } /** * Utility method to create Dataset (Dataframe) from RDD. * NOTE: [T] must be [Serializable]. */ -inline fun RDD.toDF(spark: SparkSession): Dataset = - toDS(spark).toDF() +inline fun RDD.toDF(spark: SparkSession, vararg colNames: String): Dataset = + toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } /** diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt index 652e52b7..a740efad 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt @@ -60,12 +60,24 @@ class KSparkSession(val spark: SparkSession) { /** Utility method to create dataset from list. */ inline fun List.toDS(): Dataset = toDS(spark) + /** Utility method to create dataframe from list. */ + inline fun List.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames) + /** Utility method to create dataset from [Array]. */ - inline fun Array.toDS(): Dataset = spark.dsOf(*this) + inline fun Array.toDS(): Dataset = toDS(spark) + + /** Utility method to create dataframe from [Array]. */ + inline fun Array.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames) /** Utility method to create dataset from vararg arguments. */ inline fun dsOf(vararg arg: T): Dataset = spark.dsOf(*arg) + /** Utility method to create dataframe from *array or vararg arguments */ + inline fun dfOf(vararg arg: T): Dataset = spark.dfOf(*arg) + + /**Utility method to create dataframe from *array or vararg arguments with given column names */ + inline fun dfOf(colNames: Array, vararg arg: T): Dataset = spark.dfOf(colNames, *arg) + /** Utility method to create dataset from Scala [RDD]. */ inline fun RDD.toDS(): Dataset = toDS(spark) @@ -76,13 +88,13 @@ class KSparkSession(val spark: SparkSession) { * Utility method to create Dataset (Dataframe) from RDD. * NOTE: [T] must be [Serializable]. */ - inline fun RDD.toDF(): Dataset = toDF(spark) + inline fun RDD.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames) /** * Utility method to create Dataset (Dataframe) from JavaRDD. * NOTE: [T] must be [Serializable]. */ - inline fun JavaRDDLike.toDF(): Dataset = toDF(spark) + inline fun JavaRDDLike.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames) /** * A collection of methods for registering user-defined functions (UDF). diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt index e120ffe7..411bee2c 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UdtTest.kt @@ -1,3 +1,22 @@ +/*- + * =LICENSE= + * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ package org.jetbrains.kotlinx.spark.api import io.kotest.core.spec.style.ShouldSpec @@ -122,4 +141,4 @@ class City(val name: String, val departmentNumber: Int) : Serializable { result = 31 * result + departmentNumber return result } -} \ No newline at end of file +} From 5de11c67b82fce2e2e1980775895c519a4298426 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Wed, 1 Jun 2022 17:57:30 +0200 Subject: [PATCH 5/7] added typedCol() and MLlib samples --- .../kotlinx/spark/examples/Correlation.kt | 64 --------- .../jetbrains/kotlinx/spark/examples/MLlib.kt | 134 ++++++++++++++++++ .../org/jetbrains/kotlinx/spark/api/Column.kt | 25 +++- 3 files changed, 158 insertions(+), 65 deletions(-) delete mode 100644 examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt create mode 100644 examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt deleted file mode 100644 index 53ce3f6d..00000000 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Correlation.kt +++ /dev/null @@ -1,64 +0,0 @@ -/*- - * =LICENSE= - * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) - * ---------- - * Copyright (C) 2019 - 2022 JetBrains - * ---------- - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * =LICENSEEND= - */ -package org.jetbrains.kotlinx.spark.examples - -import org.apache.spark.ml.linalg.Matrix -import org.apache.spark.ml.linalg.VectorUDT -import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.stat.Correlation -import org.apache.spark.sql.Row -import org.apache.spark.sql.RowFactory -import org.apache.spark.sql.types.Metadata -import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.types.StructType -import org.jetbrains.kotlinx.spark.api.showDS -import org.jetbrains.kotlinx.spark.api.tuples.* -import org.jetbrains.kotlinx.spark.api.withSpark -import scala.Tuple1 - - -fun main() = withSpark { - val data = listOf( - Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(1.0, -2.0)), - Vectors.dense(4.0, 5.0, 0.0, 3.0), - Vectors.dense(6.0, 7.0, 0.0, 8.0), - Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(9.0, 1.0)) - ).map(::tupleOf) - - val df = data.toDF("features") - - val r1 = Correlation.corr(df, "features").head().getAs(0) - println( - """ - |Pearson correlation matrix: - |$r1 - | - """.trimMargin() - ) - - val r2 = Correlation.corr(df, "features", "spearman").head().getAs(0) - println( - """ - |Spearman correlation matrix: - |$r2 - | - """.trimMargin() - ) -} \ No newline at end of file diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt new file mode 100644 index 00000000..32c01370 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt @@ -0,0 +1,134 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples + +import org.apache.spark.* +import org.apache.spark.ml.linalg.Matrix +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.stat.ChiSquareTest +import org.apache.spark.ml.stat.Correlation +import org.apache.spark.ml.stat.Summarizer +import org.apache.spark.ml.stat.Summarizer.* +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.jetbrains.kotlinx.spark.api.* +import org.jetbrains.kotlinx.spark.api.tuples.* +import scala.collection.mutable.WrappedArray + + +fun main() = withSpark { + correlation() + chiSquare() + summarizer() +} + +private fun KSparkSession.correlation() { + println("Correlation:") + + val data = listOf( + Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(1.0, -2.0)), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(6.0, 7.0, 0.0, 8.0), + Vectors.sparse(4, intArrayOf(0, 3), doubleArrayOf(9.0, 1.0)) + ).map(::tupleOf) + + val df = data.toDF("features") + + val r1 = Correlation.corr(df, "features").head().getAs(0) + println( + """ + |Pearson correlation matrix: + |$r1 + | + """.trimMargin() + ) + + val r2 = Correlation.corr(df, "features", "spearman").head().getAs(0) + println( + """ + |Spearman correlation matrix: + |$r2 + | + """.trimMargin() + ) +} + +private fun KSparkSession.chiSquare() { + println("ChiSquare:") + + val data = listOf( + t(0.0, Vectors.dense(0.5, 10.0)), + t(0.0, Vectors.dense(1.5, 20.0)), + t(1.0, Vectors.dense(1.5, 30.0)), + t(0.0, Vectors.dense(3.5, 30.0)), + t(0.0, Vectors.dense(3.5, 40.0)), + t(1.0, Vectors.dense(3.5, 40.0)), + ) + + // while df.getAs(0) works, it's often easier to just parse the result as a typed Dataset :) + data class ChiSquareTestResult( + val pValues: Vector, + val degreesOfFreedom: List, + val statistics: Vector, + ) + + val df: Dataset = data.toDF("label", "features") + val chi = ChiSquareTest.test(df, "features", "label") + .to() + .head() + + println("pValues: ${chi.pValues}") + println("degreesOfFreedom: ${chi.degreesOfFreedom}") + println("statistics: ${chi.statistics}") + println() +} + +private fun KSparkSession.summarizer() { + println("Summarizer:") + + val data = listOf( + t(Vectors.dense(2.0, 3.0, 5.0), 1.0), + t(Vectors.dense(4.0, 6.0, 7.0), 2.0) + ) + + val df = data.toDF("features", "weight") + + val result1 = df + .select( + metrics("mean", "variance") + .summary(col("features"), col("weight")).`as`("summary") + ) + .select("summary.mean", "summary.variance") + .first() + + println("with weight: mean = ${result1.getAs(0)}, variance = ${result1.getAs(1)}") + + val result2 = df + .select( + mean(col("features")), + variance(col("features")), + ) + .first() + + println("without weight: mean = ${result2.getAs(0)}, variance = ${result2.getAs(1)}") + println() +} \ No newline at end of file diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt index 72e8a9b7..ae49670f 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt @@ -420,13 +420,36 @@ inline fun Column.`as`(): TypedColumn = `as`(encoder()) */ fun lit(a: Any): Column = functions.lit(a) +/** + * Returns a [TypedColumn] based on the given column name and type [T]. + * + * This is just a shortcut to the function from [org.apache.spark.sql.functions] combined with an [as] call. + * For all the functions, simply add `import org.apache.spark.sql.functions.*` to your file. + * + * @see col + * @see as + */ +inline fun typedCol(colName: String): TypedColumn = functions.col(colName).`as`() + /** * Returns a [Column] based on the given class attribute, not connected to a dataset. * ```kotlin * val dataset: Dataset = ... * val new: Dataset> = dataset.select( col(YourClass::a), col(YourClass::b) ) * ``` + * @see typedCol + */ +@Suppress("UNCHECKED_CAST") +inline fun col(column: KProperty1): TypedColumn = typedCol(column) + +/** + * Returns a [Column] based on the given class attribute, not connected to a dataset. + * ```kotlin + * val dataset: Dataset = ... + * val new: Dataset> = dataset.select( typedCol(YourClass::a), typedCol(YourClass::b) ) + * ``` + * @see col */ @Suppress("UNCHECKED_CAST") -inline fun col(column: KProperty1): TypedColumn = +inline fun typedCol(column: KProperty1): TypedColumn = functions.col(column.name).`as`() as TypedColumn From 20dbd012edfb0ad603525cbc5870e87964801176 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Wed, 1 Jun 2022 18:15:31 +0200 Subject: [PATCH 6/7] more samples, can probably safely say all works --- .../jetbrains/kotlinx/spark/examples/MLlib.kt | 150 +++++++++++++++++- 1 file changed, 145 insertions(+), 5 deletions(-) diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt index 32c01370..e1eaeb8e 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MLlib.kt @@ -19,26 +19,38 @@ */ package org.jetbrains.kotlinx.spark.examples -import org.apache.spark.* +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.PipelineModel +import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.classification.LogisticRegressionModel +import org.apache.spark.ml.feature.HashingTF +import org.apache.spark.ml.feature.Tokenizer import org.apache.spark.ml.linalg.Matrix import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.stat.ChiSquareTest import org.apache.spark.ml.stat.Correlation -import org.apache.spark.ml.stat.Summarizer import org.apache.spark.ml.stat.Summarizer.* import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col -import org.jetbrains.kotlinx.spark.api.* -import org.jetbrains.kotlinx.spark.api.tuples.* -import scala.collection.mutable.WrappedArray +import org.jetbrains.kotlinx.spark.api.KSparkSession +import org.jetbrains.kotlinx.spark.api.to +import org.jetbrains.kotlinx.spark.api.tuples.t +import org.jetbrains.kotlinx.spark.api.tuples.tupleOf +import org.jetbrains.kotlinx.spark.api.withSpark fun main() = withSpark { + // https://spark.apache.org/docs/latest/ml-statistics.html correlation() chiSquare() summarizer() + + // https://spark.apache.org/docs/latest/ml-pipeline.html + estimatorTransformerParam() + pipeline() } private fun KSparkSession.correlation() { @@ -131,4 +143,132 @@ private fun KSparkSession.summarizer() { println("without weight: mean = ${result2.getAs(0)}, variance = ${result2.getAs(1)}") println() +} + +private fun KSparkSession.estimatorTransformerParam() { + println("Estimator, Transformer, and Param") + + // Prepare training data from a list of (label, features) tuples. + val training = listOf( + t(1.0, Vectors.dense(0.0, 1.1, 0.1)), + t(0.0, Vectors.dense(2.0, 1.0, -1.0)), + t(0.0, Vectors.dense(2.0, 1.3, 1.0)), + t(1.0, Vectors.dense(0.0, 1.2, -0.5)) + ).toDF("label", "features") + + // Create a LogisticRegression instance. This instance is an Estimator. + val lr = LogisticRegression() + + // Print out the parameters, documentation, and any default values. + println("LogisticRegression parameters:\n ${lr.explainParams()}\n") + + // We may set parameters using setter methods. + lr.apply { + maxIter = 10 + regParam = 0.01 + } + + // Learn a LogisticRegression model. This uses the parameters stored in lr. + val model1 = lr.fit(training) + // Since model1 is a Model (i.e., a Transformer produced by an Estimator), + // we can view the parameters it used during fit(). + // This prints the parameter (name: value) pairs, where names are unique IDs for this + // LogisticRegression instance. + println("Model 1 was fit using parameters: ${model1.parent().extractParamMap()}") + + // We may alternatively specify parameters using a ParamMap. + val paramMap = ParamMap() + .put(lr.maxIter().w(20)) // Specify 1 Param. + .put(lr.maxIter(), 30) // This overwrites the original maxIter. + .put(lr.regParam().w(0.1), lr.threshold().w(0.55)) // Specify multiple Params. + + // One can also combine ParamMaps. + val paramMap2 = ParamMap() + .put(lr.probabilityCol().w("myProbability")) // Change output column name + + val paramMapCombined = paramMap.`$plus$plus`(paramMap2) + + // Now learn a new model using the paramMapCombined parameters. + // paramMapCombined overrides all parameters set earlier via lr.set* methods. + val model2: LogisticRegressionModel = lr.fit(training, paramMapCombined) + println("Model 2 was fit using parameters: ${model2.parent().extractParamMap()}") + + // Prepare test documents. + val test = listOf( + t(1.0, Vectors.dense(-1.0, 1.5, 1.3)), + t(0.0, Vectors.dense(3.0, 2.0, -0.1)), + t(1.0, Vectors.dense(0.0, 2.2, -1.5)), + ).toDF("label", "features") + + // Make predictions on test documents using the Transformer.transform() method. + // LogisticRegression.transform will only use the 'features' column. + // Note that model2.transform() outputs a 'myProbability' column instead of the usual + // 'probability' column since we renamed the lr.probabilityCol parameter previously. + val results = model2.transform(test) + val rows = results.select("features", "label", "myProbability", "prediction") + for (r: Row in rows.collectAsList()) + println("(${r[0]}, ${r[1]}) -> prob=${r[2]}, prediction=${r[3]}") + + println() +} + +private fun KSparkSession.pipeline() { + println("Pipeline:") +// Prepare training documents from a list of (id, text, label) tuples. + val training = listOf( + t(0L, "a b c d e spark", 1.0), + t(1L, "b d", 0.0), + t(2L, "spark f g h", 1.0), + t(3L, "hadoop mapreduce", 0.0) + ).toDF("id", "text", "label") + + // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. + val tokenizer = Tokenizer() + .setInputCol("text") + .setOutputCol("words") + val hashingTF = HashingTF() + .setNumFeatures(1000) + .setInputCol(tokenizer.outputCol) + .setOutputCol("features") + val lr = LogisticRegression() + .setMaxIter(10) + .setRegParam(0.001) + val pipeline = Pipeline() + .setStages( + arrayOf( + tokenizer, + hashingTF, + lr, + ) + ) + + // Fit the pipeline to training documents. + val model = pipeline.fit(training) + + // Now we can optionally save the fitted pipeline to disk + model.write().overwrite().save("/tmp/spark-logistic-regression-model") + + // We can also save this unfit pipeline to disk + pipeline.write().overwrite().save("/tmp/unfit-lr-model") + + // And load it back in during production + val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model") + + // Prepare test documents, which are unlabeled (id, text) tuples. + val test = listOf( + t(4L, "spark i j k"), + t(5L, "l m n"), + t(6L, "spark hadoop spark"), + t(7L, "apache hadoop"), + ).toDF("id", "text") + + // Make predictions on test documents. + val predictions = model.transform(test) + .select("id", "text", "probability", "prediction") + .collectAsList() + + for (r in predictions) + println("(${r[0]}, ${r[1]}) --> prob=${r[2]}, prediction=${r[3]}") + + println() } \ No newline at end of file From d44bd8c21e88969a1e589b10832d0cd2677e794f Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Wed, 15 Jun 2022 17:52:59 +0200 Subject: [PATCH 7/7] removing typedCol since udf is already adding it. Suppressing compiler warnings for deprecation and other generated functions --- .../jetbrains/kotlinx/spark/api/Arities.kt | 4 ++- .../org/jetbrains/kotlinx/spark/api/Column.kt | 25 +------------------ .../kotlinx/spark/api/Conversions.kt | 2 +- .../jetbrains/kotlinx/spark/api/Dataset.kt | 2 ++ .../spark/api/tuples/TupleConcatenation.kt | 2 +- .../spark/api/tuples/TupleExtending.kt | 2 +- .../kotlinx/spark/api/tuples/TupleZip.kt | 1 + 7 files changed, 10 insertions(+), 28 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Arities.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Arities.kt index 93371fee..ed405f6a 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Arities.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Arities.kt @@ -19,6 +19,8 @@ */ /** + * DEPRECATED: Use Scala tuples instead. + * * Helper classes and functions to work with unnamed tuples we call Arities. * Arities are easier to work with in Kotlin than Scala Tuples since they are Kotlin data classes. * This means they can be destructured, copied, etc. @@ -39,7 +41,7 @@ * } * ``` */ - +@file:Suppress("DEPRECATION") package org.jetbrains.kotlinx.spark.api import java.io.Serializable diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt index ae49670f..72e8a9b7 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Column.kt @@ -420,36 +420,13 @@ inline fun Column.`as`(): TypedColumn = `as`(encoder()) */ fun lit(a: Any): Column = functions.lit(a) -/** - * Returns a [TypedColumn] based on the given column name and type [T]. - * - * This is just a shortcut to the function from [org.apache.spark.sql.functions] combined with an [as] call. - * For all the functions, simply add `import org.apache.spark.sql.functions.*` to your file. - * - * @see col - * @see as - */ -inline fun typedCol(colName: String): TypedColumn = functions.col(colName).`as`() - /** * Returns a [Column] based on the given class attribute, not connected to a dataset. * ```kotlin * val dataset: Dataset = ... * val new: Dataset> = dataset.select( col(YourClass::a), col(YourClass::b) ) * ``` - * @see typedCol - */ -@Suppress("UNCHECKED_CAST") -inline fun col(column: KProperty1): TypedColumn = typedCol(column) - -/** - * Returns a [Column] based on the given class attribute, not connected to a dataset. - * ```kotlin - * val dataset: Dataset = ... - * val new: Dataset> = dataset.select( typedCol(YourClass::a), typedCol(YourClass::b) ) - * ``` - * @see col */ @Suppress("UNCHECKED_CAST") -inline fun typedCol(column: KProperty1): TypedColumn = +inline fun col(column: KProperty1): TypedColumn = functions.col(column.name).`as`() as TypedColumn diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt index f8d90fa3..b9b5d306 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt @@ -23,7 +23,7 @@ * and Kotlin/Java variants. */ -@file:Suppress("NOTHING_TO_INLINE", "RemoveExplicitTypeArguments", "unused") +@file:Suppress("NOTHING_TO_INLINE", "RemoveExplicitTypeArguments", "unused", "DEPRECATION") package org.jetbrains.kotlinx.spark.api diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt index 39a60bc8..d44c62dd 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt @@ -197,6 +197,7 @@ inline fun Dataset>.takeKeys(): Dataset = map * (Kotlin-specific) * Maps the Dataset to only retain the "keys" or [Arity2._1] values. */ +@Suppress("DEPRECATION") @JvmName("takeKeysArity2") @Deprecated("Use Scala tuples instead.", ReplaceWith("")) inline fun Dataset>.takeKeys(): Dataset = map { it._1 } @@ -218,6 +219,7 @@ inline fun Dataset>.takeValues(): Dataset = ma * (Kotlin-specific) * Maps the Dataset to only retain the "values" or [Arity2._2] values. */ +@Suppress("DEPRECATION") @JvmName("takeValuesArity2") @Deprecated("Use Scala tuples instead.", ReplaceWith("")) inline fun Dataset>.takeValues(): Dataset = map { it._2 } diff --git a/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleConcatenation.kt b/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleConcatenation.kt index fe07a4ff..5aafad98 100644 --- a/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleConcatenation.kt +++ b/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleConcatenation.kt @@ -17,7 +17,7 @@ * limitations under the License. * =LICENSEEND= */ -@file:Suppress("FunctionName", "RemoveExplicitTypeArguments") +@file:Suppress("FunctionName", "RemoveExplicitTypeArguments", "UNUSED_PARAMETER") package org.jetbrains.kotlinx.spark.api.tuples import scala.Tuple1 diff --git a/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleExtending.kt b/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleExtending.kt index 595d1883..230b889a 100644 --- a/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleExtending.kt +++ b/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleExtending.kt @@ -17,7 +17,7 @@ * limitations under the License. * =LICENSEEND= */ -@file:Suppress("FunctionName", "RemoveExplicitTypeArguments") +@file:Suppress("FunctionName", "RemoveExplicitTypeArguments", "UNUSED_PARAMETER") package org.jetbrains.kotlinx.spark.api.tuples import scala.Tuple1 diff --git a/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleZip.kt b/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleZip.kt index c4930996..608cc804 100644 --- a/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleZip.kt +++ b/scala-tuples-in-kotlin/src/main/kotlin/org/jetbrains/kotlinx/spark/api/tuples/TupleZip.kt @@ -17,6 +17,7 @@ * limitations under the License. * =LICENSEEND= */ +@file:Suppress("UNUSED_PARAMETER") package org.jetbrains.kotlinx.spark.api.tuples import scala.*