diff --git a/examples/pom-3.2_2.12.xml b/examples/pom-3.2_2.12.xml index b5267352..5f214b69 100644 --- a/examples/pom-3.2_2.12.xml +++ b/examples/pom-3.2_2.12.xml @@ -24,6 +24,11 @@ spark-sql_${scala.compat.version} ${spark3.version} + + org.apache.spark + spark-streaming_${scala.compat.version} + ${spark3.version} + diff --git a/kotlin-spark-api/3.2/pom_2.12.xml b/kotlin-spark-api/3.2/pom_2.12.xml index 756d9c2b..826547d2 100644 --- a/kotlin-spark-api/3.2/pom_2.12.xml +++ b/kotlin-spark-api/3.2/pom_2.12.xml @@ -36,6 +36,12 @@ ${spark3.version} provided + + org.apache.spark + spark-streaming_${scala.compat.version} + ${spark3.version} + provided + diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 1061a21a..7e9ef135 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -23,9 +23,10 @@ package org.jetbrains.kotlinx.spark.api import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.api.java.* import org.apache.spark.api.java.function.* import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD import org.apache.spark.sql.* import org.apache.spark.sql.Encoders.* import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -154,6 +155,18 @@ inline fun SparkSession.dsOf(vararg t: T): Dataset = inline fun List.toDS(spark: SparkSession): Dataset = spark.createDataset(this, encoder()) +/** + * Utility method to create dataset from RDD + */ +inline fun RDD.toDS(spark: SparkSession): Dataset = + spark.createDataset(this, encoder()) + +/** + * Utility method to create dataset from JavaRDD + */ +inline fun JavaRDDLike.toDS(spark: SparkSession): Dataset = + spark.createDataset(this.rdd(), encoder()) + /** * Main method of API, which gives you seamless integration with Spark: * It creates encoder for any given supported type T diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt index 6188daae..98fdae8d 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt @@ -20,6 +20,11 @@ package org.jetbrains.kotlinx.spark.api import org.apache.spark.SparkConf +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Dataset import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.UDFRegistration import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR @@ -78,18 +83,38 @@ inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KS KSparkSession(this).apply { sparkContext.setLogLevel(logLevel) func() + spark.stop() } } - .also { it.stop() } +} + +/** + * Wrapper for spark creation which copies params from [sparkConf]. + * + * @param sparkConf Sets a list of config options based on this. + * @param logLevel Control our logLevel. This overrides any user-defined log settings. + * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) + */ +@JvmOverloads +inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) { + withSpark( + builder = SparkSession.builder().config(sparkConf), + logLevel = logLevel, + func = func, + ) } /** * This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset] */ -@Suppress("EXPERIMENTAL_FEATURE_WARNING", "unused") -inline class KSparkSession(val spark: SparkSession) { +class KSparkSession(val spark: SparkSession) { + + val sc: JavaSparkContext by lazy { JavaSparkContext(spark.sparkContext) } + inline fun List.toDS() = toDS(spark) inline fun Array.toDS() = spark.dsOf(*this) inline fun dsOf(vararg arg: T) = spark.dsOf(*arg) + inline fun RDD.toDS() = toDS(spark) + inline fun JavaRDDLike.toDS() = toDS(spark) val udf: UDFRegistration get() = spark.udf() } diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt index a4b2bdd7..af870038 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt @@ -22,32 +22,34 @@ */ package org.jetbrains.kotlinx.spark.api -data class Arity1(val _1: T1) -data class Arity2(val _1: T1, val _2: T2) -data class Arity3(val _1: T1, val _2: T2, val _3: T3) -data class Arity4(val _1: T1, val _2: T2, val _3: T3, val _4: T4) -data class Arity5(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5) -data class Arity6(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6) -data class Arity7(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7) -data class Arity8(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8) -data class Arity9(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9) -data class Arity10(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10) -data class Arity11(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11) -data class Arity12(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12) -data class Arity13(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13) -data class Arity14(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14) -data class Arity15(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15) -data class Arity16(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16) -data class Arity17(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17) -data class Arity18(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18) -data class Arity19(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19) -data class Arity20(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20) -data class Arity21(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21) -data class Arity22(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22) -data class Arity23(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23) -data class Arity24(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24) -data class Arity25(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25) -data class Arity26(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25, val _26: T26) +import java.io.Serializable + +data class Arity1(val _1: T1): Serializable +data class Arity2(val _1: T1, val _2: T2): Serializable +data class Arity3(val _1: T1, val _2: T2, val _3: T3): Serializable +data class Arity4(val _1: T1, val _2: T2, val _3: T3, val _4: T4): Serializable +data class Arity5(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5): Serializable +data class Arity6(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6): Serializable +data class Arity7(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7): Serializable +data class Arity8(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8): Serializable +data class Arity9(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9): Serializable +data class Arity10(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10): Serializable +data class Arity11(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11): Serializable +data class Arity12(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12): Serializable +data class Arity13(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13): Serializable +data class Arity14(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14): Serializable +data class Arity15(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15): Serializable +data class Arity16(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16): Serializable +data class Arity17(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17): Serializable +data class Arity18(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18): Serializable +data class Arity19(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19): Serializable +data class Arity20(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20): Serializable +data class Arity21(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21): Serializable +data class Arity22(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22): Serializable +data class Arity23(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23): Serializable +data class Arity24(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24): Serializable +data class Arity25(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25): Serializable +data class Arity26(val _1: T1, val _2: T2, val _3: T3, val _4: T4, val _5: T5, val _6: T6, val _7: T7, val _8: T8, val _9: T9, val _10: T10, val _11: T11, val _12: T12, val _13: T13, val _14: T14, val _15: T15, val _16: T16, val _17: T17, val _18: T18, val _19: T19, val _20: T20, val _21: T21, val _22: T22, val _23: T23, val _24: T24, val _25: T25, val _26: T26): Serializable fun c(_1: T1) = Arity1(_1) fun c(_1: T1, _2: T2) = Arity2(_1, _2) fun c(_1: T1, _2: T2, _3: T3) = Arity3(_1, _2, _3) diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index ed784b13..06c5628f 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -21,6 +21,11 @@ import ch.tutteli.atrium.api.fluent.en_GB.* import ch.tutteli.atrium.api.verbs.expect import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.shouldBe +import org.apache.spark.api.java.JavaDoubleRDD +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions.* import org.apache.spark.sql.streaming.GroupState @@ -593,10 +598,73 @@ class ApiTest : ShouldSpec({ it.nullable() shouldBe true } } + should("Convert Scala RDD to Dataset") { + val rdd0: RDD = sc.parallelize( + listOf(1, 2, 3, 4, 5, 6) + ).rdd() + val dataset0: Dataset = rdd0.toDS() + + dataset0.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) + } + + should("Convert a JavaRDD to a Dataset") { + val rdd1: JavaRDD = sc.parallelize( + listOf(1, 2, 3, 4, 5, 6) + ) + val dataset1: Dataset = rdd1.toDS() + + dataset1.toList() shouldBe listOf(1, 2, 3, 4, 5, 6) + } + should("Convert JavaDoubleRDD to Dataset") { + + // JavaDoubleRDD + val rdd2: JavaDoubleRDD = sc.parallelizeDoubles( + listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) + ) + val dataset2: Dataset = rdd2.toDS() + + dataset2.toList() shouldBe listOf(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) + } + should("Convert JavaPairRDD to Dataset") { + val rdd3: JavaPairRDD = sc.parallelizePairs( + listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) + ) + val dataset3: Dataset> = rdd3.toDS() + + dataset3.toList>() shouldBe listOf(Tuple2(1, 1.0), Tuple2(2, 2.0), Tuple2(3, 3.0)) + } + should("Convert Kotlin Serializable data class RDD to Dataset") { + val rdd4 = sc.parallelize( + listOf(SomeClass(intArrayOf(1, 2), 0)) + ) + val dataset4 = rdd4.toDS() + + dataset4.toList().first().let { (a, b) -> + a contentEquals intArrayOf(1, 2) shouldBe true + b shouldBe 0 + } + } + should("Convert Arity RDD to Dataset") { + val rdd5 = sc.parallelize( + listOf(c(1.0, 4)) + ) + val dataset5 = rdd5.toDS() + + dataset5.toList>() shouldBe listOf(c(1.0, 4)) + } + should("Convert List RDD to Dataset") { + val rdd6 = sc.parallelize( + listOf(listOf(1, 2, 3), listOf(4, 5, 6)) + ) + val dataset6 = rdd6.toDS() + + dataset6.toList>() shouldBe listOf(listOf(1, 2, 3), listOf(4, 5, 6)) + } } } }) + data class DataClassWithTuple(val tuple: T) data class LonLat(val lon: Double, val lat: Double) @@ -626,5 +694,5 @@ data class ComplexEnumDataClass( data class NullFieldAbleDataClass( val optionList: List?, - val optionMap: Map? + val optionMap: Map?, ) \ No newline at end of file diff --git a/pom.xml b/pom.xml index 47043737..2ced5eb7 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ 3.0.0-M5 1.6.8 4.5.6 + official