diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e8024b45..63d49c25 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,5 +25,12 @@ jobs: key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Build with Maven - run: ./mvnw -B package --file pom.xml -Pscala-2.12 + run: ./mvnw -B package --file pom.xml -Pscala-2.12 -Dkotest.tags="!Kafka" + qodana: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: 'Qodana Scan' + uses: JetBrains/qodana-action@v5.0.2 + # vim: ts=2:sts=2:sw=2:expandtab diff --git a/README.md b/README.md index bd86267f..bd227403 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache - [Column infix/operator functions](#column-infixoperator-functions) - [Overload Resolution Ambiguity](#overload-resolution-ambiguity) - [Tuples](#tuples) + - [Streaming](#streaming) - [Examples](#examples) - [Reporting issues/Support](#reporting-issuessupport) - [Code of Conduct](#code-of-conduct) @@ -267,6 +268,48 @@ Finally, all these tuple helper functions are also baked in: - `map` - `cast` +### Streaming + +A popular Spark extension is [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html). +Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs. +There are examples for use with a checkpoint, Kafka and SQL in the [examples module](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming). + +We shall also provide a quick example below: +```kotlin +// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout +withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession + + // create input stream for, for instance, Netcat: `$ nc -lk 9999` + val lines: JavaReceiverInputDStream = ssc.socketTextStream("localhost", 9999) + + // split input stream on space + val words: JavaDStream = lines.flatMap { it.split(" ").iterator() } + + // perform action on each formed RDD in the stream + words.foreachRDD { rdd: JavaRDD, _: Time -> + + // to convert the JavaRDD to a Dataset, we need a spark session using the RDD context + withSpark(rdd) { // this: KSparkSession + val dataframe: Dataset = rdd.map { TestRow(word = it) }.toDS() + dataframe + .groupByKey { it.word } + .count() + .show() + // +-----+--------+ + // | key|count(1)| + // +-----+--------+ + // |hello| 1| + // | is| 1| + // | a| 1| + // | this| 1| + // | test| 3| + // +-----+--------+ + } + } +} +``` + + ## Examples For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module. diff --git a/examples/pom-3.2_2.12.xml b/examples/pom-3.2_2.12.xml index 5f214b69..c3b95b0e 100644 --- a/examples/pom-3.2_2.12.xml +++ b/examples/pom-3.2_2.12.xml @@ -29,6 +29,11 @@ spark-streaming_${scala.compat.version} ${spark3.version} + + org.apache.spark + spark-streaming-kafka-0-10_${scala.compat.version} + ${spark3.version} + @@ -90,6 +95,14 @@ true + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Broadcasting.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Broadcasting.kt index 2e5914e3..9612b350 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Broadcasting.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Broadcasting.kt @@ -21,7 +21,6 @@ package org.jetbrains.kotlinx.spark.examples import org.jetbrains.kotlinx.spark.api.broadcast import org.jetbrains.kotlinx.spark.api.map -import org.jetbrains.kotlinx.spark.api.sparkContext import org.jetbrains.kotlinx.spark.api.withSpark import java.io.Serializable diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt index 0fc2517f..fc0a2888 100644 --- a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt @@ -19,13 +19,14 @@ */ package org.jetbrains.kotlinx.spark.examples -import org.apache.spark.api.java.function.ReduceFunction import org.apache.spark.sql.Dataset import org.jetbrains.kotlinx.spark.api.* import org.jetbrains.kotlinx.spark.api.tuples.* -import scala.* +import scala.Tuple2 +import scala.Tuple3 data class Q(val id: Int, val text: T) + @Suppress("RedundantLambdaArrow", "UsePropertyAccessSyntax") object Main { diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinDirectKafkaWordCount.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinDirectKafkaWordCount.kt new file mode 100644 index 00000000..476815e2 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinDirectKafkaWordCount.kt @@ -0,0 +1,114 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples.streaming + +import org.apache.kafka.clients.consumer.ConsumerConfig.* +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.api.java.JavaInputDStream +import org.apache.spark.streaming.kafka010.ConsumerStrategies +import org.apache.spark.streaming.kafka010.KafkaUtils +import org.apache.spark.streaming.kafka010.LocationStrategies +import org.jetbrains.kotlinx.spark.api.reduceByKey +import org.jetbrains.kotlinx.spark.api.tuples.* +import org.jetbrains.kotlinx.spark.api.withSparkStreaming +import scala.Tuple2 +import java.io.Serializable +import java.util.regex.Pattern +import kotlin.system.exitProcess + + +/** + * Src: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java + * + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaDirectKafkaWordCount + * is a list of one or more Kafka brokers + * is a consumer group name to consume from topics + * is a list of one or more kafka topics to consume from + * + * Example: + * + * First make sure you have a Kafka producer running. For instance, when running locally: + * $ kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 + * + * Then start the program normally or like this: + * $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \ + * consumer-group topic1,topic2 + */ +object KotlinDirectKafkaWordCount { + + private val SPACE = Pattern.compile(" ") + + private const val DEFAULT_BROKER = "localhost:9092" + private const val DEFAULT_GROUP_ID = "consumer-group" + private const val DEFAULT_TOPIC = "quickstart-events" + + @JvmStatic + fun main(args: Array) { + if (args.size < 3 && args.isNotEmpty()) { + System.err.println( + """Usage: JavaDirectKafkaWordCount + is a list of one or more Kafka brokers + is a consumer group name to consume from topics + is a list of one or more kafka topics to consume from + """.trimIndent() + ) + exitProcess(1) + } + + val brokers: String = args.getOrElse(0) { DEFAULT_BROKER } + val groupId: String = args.getOrElse(1) { DEFAULT_GROUP_ID } + val topics: String = args.getOrElse(2) { DEFAULT_TOPIC } + + // Create context with a 2 seconds batch interval + withSparkStreaming(batchDuration = Durations.seconds(2), appName = "KotlinDirectKafkaWordCount") { + + val topicsSet: Set = topics.split(',').toSet() + + val kafkaParams: Map = mapOf( + BOOTSTRAP_SERVERS_CONFIG to brokers, + GROUP_ID_CONFIG to groupId, + KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ) + + // Create direct kafka stream with brokers and topics + val messages: JavaInputDStream> = KafkaUtils.createDirectStream( + ssc, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(topicsSet, kafkaParams), + ) + + // Get the lines, split them into words, count the words and print + val lines: JavaDStream = messages.map { it.value() } + val words: JavaDStream = lines.flatMap { it.split(SPACE).iterator() } + + val wordCounts: JavaDStream> = words + .map { it X 1 } + .reduceByKey { a: Int, b: Int -> a + b } + + wordCounts.print() + + } + } +} diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinRecoverableNetworkWordCount.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinRecoverableNetworkWordCount.kt new file mode 100644 index 00000000..32db58f7 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinRecoverableNetworkWordCount.kt @@ -0,0 +1,225 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +@file:OptIn(ExperimentalTime::class) + +package org.jetbrains.kotlinx.spark.examples.streaming + +import com.google.common.io.Files +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.Time +import org.apache.spark.util.LongAccumulator +import org.jetbrains.kotlinx.spark.api.* +import org.jetbrains.kotlinx.spark.api.tuples.* +import java.io.File +import java.nio.charset.Charset +import java.util.regex.Pattern +import kotlin.system.exitProcess +import kotlin.time.ExperimentalTime + + +/** + * Use this singleton to get or register a Broadcast variable. + */ +internal object KotlinWordExcludeList { + + @Volatile + private var instance: Broadcast>? = null + + fun getInstance(sc: JavaSparkContext): Broadcast> { + if (instance == null) synchronized(KotlinWordExcludeList::class) { + if (instance == null) { + val wordExcludeList = listOf("a", "b", "c") + instance = sc.broadcast(wordExcludeList) + } + } + return instance!! + } +} + +/** + * Use this singleton to get or register an Accumulator. + */ +internal object KotlinDroppedWordsCounter { + + @Volatile + private var instance: LongAccumulator? = null + + fun getInstance(sc: JavaSparkContext): LongAccumulator { + if (instance == null) synchronized(KotlinDroppedWordsCounter::class) { + if (instance == null) + instance = sc.sc().longAccumulator("DroppedWordsCounter") + } + return instance!! + } +} + +/** + * Src: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java + * + * Counts words in text encoded with UTF8 received from the network every second. This example also + * shows how to use lazily instantiated singleton instances for Accumulator and Broadcast so that + * they can be registered on driver failures. + * + * Usage: KotlinRecoverableNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive + * data. directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * and must be absolute paths + * + * To run this on your local machine, you need to first run a Netcat server + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./bin/run-example org.apache.spark.examples.streaming.KotlinRecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * Refer to the online documentation for more details. + */ +object KotlinRecoverableNetworkWordCount { + + private val SPACE = Pattern.compile(" ") + + private const val DEFAULT_IP = "localhost" + private const val DEFAULT_PORT = "9999" + private const val DEFAULT_CHECKPOINT_DIRECTORY = "~/checkpoint/" + private const val DEFAULT_OUTPUT_PATH = "~/out" + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + if (args.size != 4 && args.isNotEmpty()) { + System.err.println("You arguments were " + listOf(*args)) + System.err.println( + """Usage: KotlinRecoverableNetworkWordCount + . and describe the TCP server that Spark + Streaming would connect to receive data. directory to + HDFS-compatible file system which checkpoint data file to which + the word counts will be appended + + In local mode, should be 'local[n]' with n > 1 + Both and must be absolute paths""".trimIndent() + ) + exitProcess(1) + } + val ip = args.getOrElse(0) { DEFAULT_IP } + val port = args.getOrElse(1) { DEFAULT_PORT }.toInt() + val checkpointDirectory = args.getOrElse(2) { DEFAULT_CHECKPOINT_DIRECTORY } + val outputPath = args.getOrElse(3) { DEFAULT_OUTPUT_PATH } + + + // (used to detect the new context) + // Create the context with a 1 second batch size or load from checkpointDirectory + withSparkStreaming( + checkpointPath = checkpointDirectory, + batchDuration = Durations.seconds(1), + appName = "KotlinRecoverableNetworkWordCount", + ) { + createContext( + ip = ip, + port = port, + outputPath = outputPath, + ) + } + } + + private fun KSparkStreamingSession.createContext( + ip: String, + port: Int, + outputPath: String, + ) { + // If you do not see this printed, that means the StreamingContext has been loaded + // from the new checkpoint + println("Creating new context") + val outputFile = File(outputPath).apply { + if (exists()) delete() + parentFile.mkdirs() + createNewFile() + } + + // Create a socket stream on target ip:port and count the + // words in input stream of \n delimited text (e.g. generated by 'nc') + val lines = ssc.socketTextStream(ip, port) + val words = lines.flatMap { it.split(SPACE).iterator() } + val wordCounts = words + .map { t(it, 1) } + .reduceByKey { a, b -> a + b } + + // in normal streaming context we can create a SparkSession from ssc: JavaStreamingContext + // normally `ssc.sparkContext().conf` + withSpark(ssc) { + listOf(1, 2, 3).toDS().show() + } + + setRunAfterStart { + println("Context is created and started running!") + } + + wordCounts.foreachRDD { rdd, time: Time -> + // but in foreachRDD we must obtain this conf from the RDD + // like `rdd.context().conf` + withSpark(rdd) { + + rdd.toDS().show() + + // Get or register the excludeList Broadcast + val excludeList = KotlinWordExcludeList.getInstance(sc) + + // Get or register the droppedWordsCounter Accumulator + val droppedWordsCounter = KotlinDroppedWordsCounter.getInstance(sc) + + // Use excludeList to drop words and use droppedWordsCounter to count them + val counts = rdd.filter { (word, count) -> + if (excludeList.value().contains(word)) { + droppedWordsCounter.add(count.toLong()) + false + } else { + true + } + }.collect() + + + val output = "Counts at time $time $counts" + println(output) + println("Dropped ${droppedWordsCounter.value()} word(s) totally") + println("Appending to " + outputFile.absolutePath) + + @Suppress("UnstableApiUsage") + Files.append( + """ + $output + + """.trimIndent(), + outputFile, + Charset.defaultCharset(), + ) + } + } + } +} diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinSqlNetworkWordCount.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinSqlNetworkWordCount.kt new file mode 100644 index 00000000..51060a20 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinSqlNetworkWordCount.kt @@ -0,0 +1,99 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples.streaming + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.StorageLevels +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.Time +import org.jetbrains.kotlinx.spark.api.withSparkStreaming +import java.io.Serializable +import java.util.regex.Pattern +import kotlin.system.exitProcess + + +/** + * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the + * network every second. + * + * Usage: KotlinSqlNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example org.apache.spark.examples.streaming.KotlinSqlNetworkWordCount localhost 9999` + */ +object KotlinSqlNetworkWordCount { + + private val SPACE = Pattern.compile(" ") + + private const val DEFAULT_IP = "localhost" + private const val DEFAULT_PORT = "9999" + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + if (args.size < 2 && args.isNotEmpty()) { + System.err.println("Usage: KotlinNetworkWordCount ") + exitProcess(1) + } + + // Create the context with a 1 second batch size + withSparkStreaming( + batchDuration = Durations.seconds(1), + appName = "KotlinSqlNetworkWordCount", + ) { + + + // Create a KotlinReceiverInputDStream on target ip:port and count the + // words in input stream of \n delimited text (e.g. generated by 'nc') + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + val lines = ssc.socketTextStream( + args.getOrElse(0) { DEFAULT_IP }, + args.getOrElse(1) { DEFAULT_PORT }.toInt(), + StorageLevels.MEMORY_AND_DISK_SER, + ) + val words = lines.flatMap { it.split(SPACE).iterator() } + + // Convert RDDs of the words DStream to DataFrame and run SQL query + words.foreachRDD { rdd: JavaRDD, time: Time -> + withSpark(rdd) { + + // Convert JavaRDD to JavaRDD to DataFrame (Dataset) + val rowRDD = rdd.map(::KotlinRecord) + val wordsDataFrame = rowRDD.toDF() + + // Creates a temporary view using the DataFrame + wordsDataFrame.createOrReplaceTempView("words") + + // Do word count on table using SQL and print it + val wordCountsDataFrame = + spark.sql("select word, count(*) as total from words group by word") + println("========= $time=========") + wordCountsDataFrame.show() + } + } + } + } +} + +data class KotlinRecord(val word: String): Serializable diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinStatefulNetworkCount.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinStatefulNetworkCount.kt new file mode 100644 index 00000000..2c938ead --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/KotlinStatefulNetworkCount.kt @@ -0,0 +1,104 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples.streaming + +import org.apache.spark.api.java.Optional +import org.apache.spark.api.java.StorageLevels +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.State +import org.apache.spark.streaming.StateSpec +import org.jetbrains.kotlinx.spark.api.getOrElse +import org.jetbrains.kotlinx.spark.api.mapWithState +import org.jetbrains.kotlinx.spark.api.toPairRDD +import org.jetbrains.kotlinx.spark.api.tuples.X +import org.jetbrains.kotlinx.spark.api.withSparkStreaming +import java.util.regex.Pattern +import kotlin.system.exitProcess + + +/** + * Src: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java + * + * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every + * second starting with initial value of word count. + * Usage: JavaStatefulNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive + * data. + * + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example + * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999` */ +object KotlinStatefulNetworkCount { + + private val SPACE = Pattern.compile(" ") + + private const val DEFAULT_HOSTNAME = "localhost" + private const val DEFAULT_PORT = "9999" + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + if (args.size < 2 && args.isNotEmpty()) { + System.err.println("Usage: JavaStatefulNetworkWordCount ") + exitProcess(1) + } + + // Create the context with a 1 second batch size + withSparkStreaming( + batchDuration = Durations.seconds(1), + checkpointPath = ".", + appName = "JavaStatefulNetworkWordCount", + ) { + + // Initial state RDD input to mapWithState + val tuples = listOf("hello" X 1, "world" X 1) + val initialRDD = ssc.sparkContext().parallelize(tuples) + + val lines = ssc.socketTextStream( + args.getOrElse(0) { DEFAULT_HOSTNAME }, + args.getOrElse(1) { DEFAULT_PORT }.toInt(), + StorageLevels.MEMORY_AND_DISK_SER_2, + ) + val words = lines.flatMap { it.split(SPACE).iterator() } + + val wordsDstream = words.map { it X 1 } + + // Update the cumulative count function + val mappingFunc = { word: String, one: Optional, state: State -> + val sum = one.getOrElse(0) + state.getOrElse(0) + val output = word X sum + state.update(sum) + output + } + + // DStream made of get cumulative counts that get updated in every batch + val stateDstream = wordsDstream.mapWithState( + StateSpec + .function(mappingFunc) + .initialState(initialRDD.toPairRDD()) + ) + + stateDstream.print() + } + } +} diff --git a/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/Streaming.kt b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/Streaming.kt new file mode 100644 index 00000000..85db9775 --- /dev/null +++ b/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/Streaming.kt @@ -0,0 +1,53 @@ +/*- + * =LICENSE= + * Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.examples.streaming + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.sql.Dataset +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.Time +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream +import org.jetbrains.kotlinx.spark.api.* + +data class TestRow( + val word: String, +) + +/** + * To run this on your local machine, you need to first run a Netcat server + * + * `$ nc -lk 9999` + */ +fun main() = withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession + + val lines: JavaReceiverInputDStream = ssc.socketTextStream("localhost", 9999) + val words: JavaDStream = lines.flatMap { it.split(" ").iterator() } + + words.foreachRDD { rdd: JavaRDD, _: Time -> + withSpark(rdd) { // this: KSparkSession + val dataframe: Dataset = rdd.map { TestRow(it) }.toDS() + dataframe + .groupByKey { it.word } + .count() + .show() + } + } +} \ No newline at end of file diff --git a/kotlin-spark-api/3.2/pom_2.12.xml b/kotlin-spark-api/3.2/pom_2.12.xml index e07de9d9..99172895 100644 --- a/kotlin-spark-api/3.2/pom_2.12.xml +++ b/kotlin-spark-api/3.2/pom_2.12.xml @@ -31,6 +31,11 @@ org.jetbrains.kotlinx.spark scala-tuples-in-kotlin + + org.apache.spark + spark-streaming-kafka-0-10_${scala.compat.version} + ${spark3.version} + @@ -46,6 +51,12 @@ ${spark3.version} provided + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + @@ -57,7 +68,19 @@ io.kotest.extensions kotest-extensions-allure - ${kotest-extension-allure.version} + ${kotest-extensions-allure.version} + test + + + io.github.embeddedkafka + embedded-kafka_${scala.compat.version} + ${embedded-kafka.version} + test + + + io.kotest.extensions + kotest-extensions-testcontainers + ${kotest-extensions-testcontainers.version} test @@ -72,6 +95,19 @@ ${atrium.version} test + + org.apache.spark + spark-streaming_${scala.compat.version} + ${spark3.version} + tests + test + + + org.apache.kafka + kafka-streams-test-utils + 3.1.0 + test + @@ -141,6 +177,14 @@ org.jacoco jacoco-maven-plugin + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt index fb6b4d29..f8d90fa3 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt @@ -27,6 +27,7 @@ package org.jetbrains.kotlinx.spark.api +import org.apache.spark.api.java.Optional import scala.* import scala.collection.JavaConverters import java.util.* @@ -42,6 +43,39 @@ import scala.collection.mutable.Buffer as ScalaMutableBuffer import scala.collection.mutable.Map as ScalaMutableMap import scala.collection.mutable.Seq as ScalaMutableSeq import scala.collection.mutable.Set as ScalaMutableSet +import org.apache.spark.streaming.State + +/** Returns state value if it exists, else `null`. */ +fun State.getOrNull(): T? = if (exists()) get() else null + +/** Returns state value if it exists, else [other]. */ +fun State.getOrElse(other: T): T = if (exists()) get() else other + + +/** Converts Scala [Option] to Kotlin nullable. */ +fun Option.getOrNull(): T? = getOrElse(null) + +/** Get if available else [other]. */ +fun Option.getOrElse(other: T): T = getOrElse { other } + +/** Converts nullable value to Scala [Option]. */ +fun T?.toOption(): Option = Option.apply(this) + +/** Converts Scala [Option] to Java [Optional]. */ +fun Option.toOptional(): Optional = Optional.ofNullable(getOrNull()) + + +/** Converts [Optional] to Kotlin nullable. */ +fun Optional.getOrNull(): T? = orNull() + +/** Get if available else [other]. */ +fun Optional.getOrElse(other: T): T = orElse(other) + +/** Converts nullable value to [Optional]. */ +fun T?.toOptional(): Optional = Optional.ofNullable(this) + +/** Converts Java [Optional] to Scala [Option]. */ +fun Optional.toOption(): Option = Option.apply(getOrNull()) /** * @see JavaConverters.asScalaIterator for more information. diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/DataStreamWriter.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/DataStreamWriter.kt new file mode 100644 index 00000000..d0c1ece1 --- /dev/null +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/DataStreamWriter.kt @@ -0,0 +1,42 @@ +/*- + * =LICENSE= + * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.api + +import org.apache.spark.api.java.function.VoidFunction2 +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.streaming.DataStreamWriter + +/** + * :: Experimental :: + * + * (Scala-specific) Sets the output of the streaming query to be processed using the provided + * function. This is supported only in the micro-batch execution modes (that is, when the + * trigger is not continuous). In every micro-batch, the provided function will be called in + * every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier. + * The batchId can be used to deduplicate and transactionally write the output + * (that is, the provided Dataset) to external systems. The output Dataset is guaranteed + * to be exactly the same for the same batchId (assuming all operations are deterministic + * in the query). + * + * @since 2.4.0 + */ +fun DataStreamWriter.forEachBatch( + func: (batch: Dataset, batchId: Long) -> Unit, +): DataStreamWriter = foreachBatch(VoidFunction2(func)) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt index 9173bf8d..71abb1ee 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt @@ -36,10 +36,7 @@ import org.apache.spark.api.java.function.ForeachPartitionFunction import org.apache.spark.api.java.function.MapFunction import org.apache.spark.api.java.function.ReduceFunction import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Column -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.KeyValueGroupedDataset -import org.apache.spark.sql.TypedColumn +import org.apache.spark.sql.* import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions import scala.Tuple2 import scala.Tuple3 @@ -78,6 +75,21 @@ inline fun RDD.toDS(spark: SparkSession): Dataset = inline fun JavaRDDLike.toDS(spark: SparkSession): Dataset = spark.createDataset(this.rdd(), encoder()) +/** + * Utility method to create Dataset (Dataframe) from JavaRDD. + * NOTE: [T] must be [Serializable]. + */ +inline fun JavaRDDLike.toDF(spark: SparkSession): Dataset = + toDS(spark).toDF() + +/** + * Utility method to create Dataset (Dataframe) from RDD. + * NOTE: [T] must be [Serializable]. + */ +inline fun RDD.toDF(spark: SparkSession): Dataset = + toDS(spark).toDF() + + /** * (Kotlin-specific) * Returns a new Dataset that contains the result of applying [func] to each element. diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt index 118abf48..27513ffc 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt @@ -22,19 +22,30 @@ * This file contains the main entry points and wrappers for the Kotlin Spark API. */ +@file:Suppress("UsePropertyAccessSyntax") + package org.jetbrains.kotlinx.spark.api + +import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaRDDLike import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.UDFRegistration +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.api.java.JavaStreamingContext import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR +import org.jetbrains.kotlinx.spark.api.tuples.* import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions +import java.io.Serializable /** * This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset]. @@ -61,6 +72,18 @@ class KSparkSession(val spark: SparkSession) { /** Utility method to create dataset from [JavaRDDLike]. */ inline fun JavaRDDLike.toDS(): Dataset = toDS(spark) + /** + * Utility method to create Dataset (Dataframe) from RDD. + * NOTE: [T] must be [Serializable]. + */ + inline fun RDD.toDF(): Dataset = toDF(spark) + + /** + * Utility method to create Dataset (Dataframe) from JavaRDD. + * NOTE: [T] must be [Serializable]. + */ + inline fun JavaRDDLike.toDF(): Dataset = toDF(spark) + /** * A collection of methods for registering user-defined functions (UDF). * @@ -76,6 +99,72 @@ class KSparkSession(val spark: SparkSession) { val udf: UDFRegistration get() = spark.udf() } +/** + * This wrapper over [SparkSession] and [JavaStreamingContext] provides several additional methods to create [org.apache.spark.sql.Dataset] + */ +class KSparkStreamingSession(@Transient val ssc: JavaStreamingContext) : Serializable { + // Serializable and Transient so that [withSpark] works inside [foreachRDD] and other Spark functions that serialize + + private var runAfterStart: KSparkStreamingSession.() -> Unit = {} + + /** [block] will be run after the streaming session has started from a new context (so not when loading from a checkpoint) + * and before it's terminated. */ + fun setRunAfterStart(block: KSparkStreamingSession.() -> Unit) { + runAfterStart = block + } + + internal fun invokeRunAfterStart(): Unit = runAfterStart() + + + /** Creates new spark session from given [sc]. */ + fun getSpark(sc: SparkConf): SparkSession = + SparkSession + .builder() + .config(sc) + .getOrCreate() + + /** Creates new spark session from context of given JavaRDD, [rddForConf]. */ + fun getSpark(rddForConf: JavaRDDLike<*, *>): SparkSession = getSpark(rddForConf.context().conf) + + /** Creates new spark session from context of given JavaStreamingContext, [sscForConf] */ + fun getSpark(sscForConf: JavaStreamingContext): SparkSession = getSpark(sscForConf.sparkContext().conf) + + /** + * Helper function to enter Spark scope from [sc] like + * ```kotlin + * withSpark(sc) { // this: KSparkSession + * + * } + * ``` + */ + fun withSpark(sc: SparkConf, func: KSparkSession.() -> T): T = + KSparkSession(getSpark(sc)).func() + + /** + * Helper function to enter Spark scope from a provided like + * when using the `foreachRDD` function. + * ```kotlin + * withSpark(rdd) { // this: KSparkSession + * + * } + * ``` + */ + fun withSpark(rddForConf: JavaRDDLike<*, *>, func: KSparkSession.() -> T): T = + KSparkSession(getSpark(rddForConf)).func() + + /** + * Helper function to enter Spark scope from [sscForConf] like + * ```kotlin + * withSpark(ssc) { // this: KSparkSession + * + * } + * ``` + */ + fun withSpark(sscForConf: JavaStreamingContext, func: KSparkSession.() -> T): T = + KSparkSession(getSpark(sscForConf)).func() +} + + /** * The entry point to programming Spark with the Dataset and DataFrame API. * @@ -146,6 +235,7 @@ inline fun withSpark( * @param logLevel Control our logLevel. This overrides any user-defined log settings. * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) */ + @JvmOverloads inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) { builder @@ -175,6 +265,88 @@ inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func ) } + +/** + * Wrapper for spark streaming creation. `spark: SparkSession` and `ssc: JavaStreamingContext` are provided, started, + * awaited, and stopped automatically. + * The use of a checkpoint directory is optional. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. + * + * @param batchDuration The time interval at which streaming data will be divided into batches. Defaults to 1 + * second. + * @param checkpointPath If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist (or `null` is provided), + * then the streaming context will be built using the other provided parameters. + * @param hadoopConf Only used if [checkpointPath] is given. Hadoop configuration if necessary for reading from + * any HDFS compatible file system. + * @param createOnError Only used if [checkpointPath] is given. Whether to create a new JavaStreamingContext if + * there is an error in reading checkpoint data. + * @param props Spark options, value types are runtime-checked for type-correctness. + * @param master Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to + * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. + * By default, it tries to get the system value "spark.master", otherwise it uses "local[*]". + * @param appName Sets a name for the application, which will be shown in the Spark web UI. + * If no application name is set, a randomly generated name will be used. + * @param timeout The time in milliseconds to wait for the stream to terminate without input. -1 by default, + * this means no timeout. + * @param startStreamingContext Defaults to `true`. If set to `false`, then the streaming context will not be started. + * @param func Function which will be executed in context of [KSparkStreamingSession] (it means that + * `this` inside block will point to [KSparkStreamingSession]) + */ +@JvmOverloads +fun withSparkStreaming( + batchDuration: Duration = Durations.seconds(1L), + checkpointPath: String? = null, + hadoopConf: Configuration = SparkHadoopUtil.get().conf(), + createOnError: Boolean = false, + props: Map = emptyMap(), + master: String = SparkConf().get("spark.master", "local[*]"), + appName: String = "Kotlin Spark Sample", + timeout: Long = -1L, + startStreamingContext: Boolean = true, + func: KSparkStreamingSession.() -> Unit, +) { + + // will only be set when a new context is created + var kSparkStreamingSession: KSparkStreamingSession? = null + + val creatingFunc = { + val sc = SparkConf() + .setAppName(appName) + .setMaster(master) + .setAll( + props + .map { (key, value) -> key X value.toString() } + .asScalaIterable() + ) + + val ssc = JavaStreamingContext(sc, batchDuration) + ssc.checkpoint(checkpointPath) + + kSparkStreamingSession = KSparkStreamingSession(ssc) + func(kSparkStreamingSession!!) + + ssc + } + + val ssc = when { + checkpointPath != null -> + JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) + + else -> creatingFunc() + } + + if (startStreamingContext) { + ssc.start() + kSparkStreamingSession?.invokeRunAfterStart() + } + ssc.awaitTerminationOrTimeout(timeout) + ssc.stop() +} + + /** * Broadcast a read-only variable to the cluster, returning a * [org.apache.spark.broadcast.Broadcast] object for reading it in distributed functions. @@ -207,5 +379,4 @@ inline fun SparkContext.broadcast(value: T): Broadcast = try { broadcast(value, encoder().clsTag()) } catch (e: ClassNotFoundException) { JavaSparkContext(this).broadcast(value) -} - +} \ No newline at end of file diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/StreamingKeyValues.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/StreamingKeyValues.kt new file mode 100644 index 00000000..8664081b --- /dev/null +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/StreamingKeyValues.kt @@ -0,0 +1,676 @@ +/*- + * =LICENSE= + * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +@file:Suppress("unused") + +package org.jetbrains.kotlinx.spark.api + +import org.apache.spark.HashPartitioner +import org.apache.spark.Partitioner +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.Optional +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.StateSpec +import org.apache.spark.streaming.api.java.JavaDStream +import org.apache.spark.streaming.api.java.JavaMapWithStateDStream +import org.apache.spark.streaming.api.java.JavaPairDStream +import scala.Tuple2 + + +fun JavaDStream>.toPairDStream(): JavaPairDStream = + JavaPairDStream.fromJavaDStream(this) + +fun JavaPairDStream.toTupleDStream(): JavaDStream> = + toJavaDStream() + +fun JavaRDD>.toPairRDD(): JavaPairRDD = + JavaPairRDD.fromJavaRDD(this) + +fun JavaPairRDD.toTupleRDD(): JavaRDD> = + JavaPairRDD.toRDD(this).toJavaRDD() + + +/** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ +fun JavaDStream>.groupByKey( + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream>> = + toPairDStream() + .groupByKey(numPartitions) + .toTupleDStream() + +/** + * Return a new DStream by applying `groupByKey` on each RDD. The supplied + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. + */ +fun JavaDStream>.groupByKey(partitioner: Partitioner): JavaDStream>> = + toPairDStream() + .groupByKey(partitioner) + .toTupleDStream() + +/** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ +fun JavaDStream>.reduceByKey( + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), + reduceFunc: (V, V) -> V, +): JavaDStream> = + toPairDStream() + .reduceByKey(reduceFunc, numPartitions) + .toTupleDStream() + +/** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ +fun JavaDStream>.reduceByKey( + partitioner: Partitioner, + reduceFunc: (V, V) -> V, +): JavaDStream> = + toPairDStream() + .reduceByKey(reduceFunc, partitioner) + .toTupleDStream() + +/** + * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in + * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. + */ +fun JavaDStream>.combineByKey( + createCombiner: (V) -> C, + mergeValue: (C, V) -> C, + mergeCombiner: (C, C) -> C, + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), + mapSideCombine: Boolean = true, +): JavaDStream> = + toPairDStream() + .combineByKey(createCombiner, mergeValue, mergeCombiner, HashPartitioner(numPartitions), mapSideCombine) + .toTupleDStream() + +/** + * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in + * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. + */ +fun JavaDStream>.combineByKey( + createCombiner: (V) -> C, + mergeValue: (C, V) -> C, + mergeCombiner: (C, C) -> C, + partitioner: Partitioner, + mapSideCombine: Boolean = true, +): JavaDStream> = + toPairDStream() + .combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) + .toTupleDStream() + +/** + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream; if not specified + * then Spark's default number of partitions will be used + */ +fun JavaDStream>.groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration = dstream().slideDuration(), + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream>> = + toPairDStream() + .groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) + .toTupleDStream() + +/** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. + */ +fun JavaDStream>.groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration = dstream().slideDuration(), + partitioner: Partitioner, +): JavaDStream>> = + toPairDStream() + .groupByKeyAndWindow(windowDuration, slideDuration, partitioner) + .toTupleDStream() + +/** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative and commutative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ +fun JavaDStream>.reduceByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration = dstream().slideDuration(), + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), + reduceFunc: (V, V) -> V, +): JavaDStream> = + toPairDStream() + .reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) + .toTupleDStream() + +/** + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative and commutative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD + * in the new DStream. + */ +fun JavaDStream>.reduceByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration = dstream().slideDuration(), + partitioner: Partitioner, + reduceFunc: (V, V) -> V, +): JavaDStream> = + toPairDStream() + .reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) + .toTupleDStream() + +/** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative and commutative reduce function + * @param invReduceFunc inverse reduce function; such that for all y, invertible x: + * `invReduceFunc(reduceFunc(x, y), x) = y` + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ +fun JavaDStream>.reduceByKeyAndWindow( + invReduceFunc: (V, V) -> V, + windowDuration: Duration, + slideDuration: Duration = dstream().slideDuration(), + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), + filterFunc: ((Tuple2) -> Boolean)? = null, + reduceFunc: (V, V) -> V, +): JavaDStream> = + toPairDStream() + .reduceByKeyAndWindow( + /* reduceFunc = */ reduceFunc, + /* invReduceFunc = */ invReduceFunc, + /* windowDuration = */ windowDuration, + /* slideDuration = */ slideDuration, + /* numPartitions = */ numPartitions, + /* filterFunc = */ filterFunc?.let { + { tuple -> + filterFunc(tuple) + } + } + ) + .toTupleDStream() + +/** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative and commutative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ +fun JavaDStream>.reduceByKeyAndWindow( + invReduceFunc: (V, V) -> V, + windowDuration: Duration, + slideDuration: Duration = dstream().slideDuration(), + partitioner: Partitioner, + filterFunc: ((Tuple2) -> Boolean)? = null, + reduceFunc: (V, V) -> V, +): JavaDStream> = + toPairDStream() + .reduceByKeyAndWindow( + /* reduceFunc = */ reduceFunc, + /* invReduceFunc = */ invReduceFunc, + /* windowDuration = */ windowDuration, + /* slideDuration = */ slideDuration, + /* partitioner = */ partitioner, + /* filterFunc = */ filterFunc?.let { + { tuple -> + filterFunc(tuple) + } + } + ) + .toTupleDStream() + +/** + * Return a [JavaMapWithStateDStream] by applying a function to every key-value element of + * `this` stream, while maintaining some state data for each unique key. The mapping function + * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this + * transformation can be specified using `StateSpec` class. The state data is accessible in + * as a parameter of type `State` in the mapping function. + * + * Example of using `mapWithState`: + * ```kotlin + * // A mapping function that maintains an integer state and return a String + * fun mappingFunction(key: String, value: Optional, state: State): Optional { + * // Use state.exists(), state.get(), state.update() and state.remove() + * // to manage state, and return the necessary string + * } + * + * val spec = StateSpec.function(::mappingFunction).numPartitions(10) + * + * val mapWithStateDStream = keyValueDStream.mapWithState(spec) + * ``` + * + * @param spec Specification of this transformation + * @tparam StateType Class type of the state data + * @tparam MappedType Class type of the mapped data + */ +fun JavaDStream>.mapWithState( + spec: StateSpec, +): JavaMapWithStateDStream = + toPairDStream().mapWithState(spec) + +/** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * In every batch the updateFunc will be called for each state even if there are no new values. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Note: Needs checkpoint directory to be set. + * @param updateFunc State update function. If `this` function returns `null`, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ +@JvmName("updateStateByKeyNullable") +fun JavaDStream>.updateStateByKey( + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), + updateFunc: (List, S?) -> S?, +): JavaDStream> = + toPairDStream() + .updateStateByKey( + { list: List, s: Optional -> + updateFunc(list, s.getOrNull()).toOptional() + }, + numPartitions, + ) + .toTupleDStream() + +/** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * In every batch the updateFunc will be called for each state even if there are no new values. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Note: Needs checkpoint directory to be set. + * @param updateFunc State update function. If `this` function returns `null`, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ +@JvmName("updateStateByKey") +fun JavaDStream>.updateStateByKey( + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), + updateFunc: (List, Optional) -> Optional, +): JavaDStream> = + toPairDStream() + .updateStateByKey( + updateFunc, + numPartitions, + ) + .toTupleDStream() + +/** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * In every batch the updateFunc will be called for each state even if there are no new values. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * Note: Needs checkpoint directory to be set. + * @param updateFunc State update function. Note, that this function may generate a different + * tuple with a different key than the input key. Therefore keys may be removed + * or added in this way. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream + * @tparam S State type + */ +@JvmName("updateStateByKeyNullable") +fun JavaDStream>.updateStateByKey( + partitioner: Partitioner, + updateFunc: (List, S?) -> S?, +): JavaDStream> = + toPairDStream() + .updateStateByKey( + { list: List, s: Optional -> + updateFunc(list, s.getOrNull()).toOptional() + }, + partitioner, + ) + .toTupleDStream() + +/** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * In every batch the updateFunc will be called for each state even if there are no new values. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * Note: Needs checkpoint directory to be set. + * @param updateFunc State update function. Note, that this function may generate a different + * tuple with a different key than the input key. Therefore keys may be removed + * or added in this way. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream + * @tparam S State type + */ +fun JavaDStream>.updateStateByKey( + partitioner: Partitioner, + updateFunc: (List, Optional) -> Optional, +): JavaDStream> = + toPairDStream() + .updateStateByKey( + updateFunc, + partitioner, + ) + .toTupleDStream() + +/** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. + * Note: Needs checkpoint directory to be set. + * @param updateFunc State update function. If `this` function returns `null`, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. + * @param initialRDD initial state value of each key. + * @tparam S State type + */ +@JvmName("updateStateByKeyNullable") +fun JavaDStream>.updateStateByKey( + partitioner: Partitioner, + initialRDD: JavaRDD>, + updateFunc: (List, S?) -> S?, +): JavaDStream> = + toPairDStream() + .updateStateByKey( + { list: List, s: Optional -> + updateFunc(list, s.getOrNull()).toOptional() + }, + partitioner, + initialRDD.toPairRDD(), + ) + .toTupleDStream() + +/** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. + * Note: Needs checkpoint directory to be set. + * @param updateFunc State update function. If `this` function returns `null`, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. + * @param initialRDD initial state value of each key. + * @tparam S State type + */ +fun JavaDStream>.updateStateByKey( + partitioner: Partitioner, + initialRDD: JavaRDD>, + updateFunc: (List, Optional) -> Optional, +): JavaDStream> = + toPairDStream() + .updateStateByKey( + updateFunc, + partitioner, + initialRDD.toPairRDD(), + ) + .toTupleDStream() + + +/** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ +fun JavaDStream>.mapValues( + mapValuesFunc: (V) -> U, +): JavaDStream> = + toPairDStream() + .mapValues(mapValuesFunc) + .toTupleDStream() + +/** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ +fun JavaDStream>.flatMapValues( + flatMapValuesFunc: (V) -> Iterator, +): JavaDStream> = + toPairDStream() + .flatMapValues(flatMapValuesFunc) + .toTupleDStream() + +/** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ +fun JavaDStream>.cogroup( + other: JavaDStream>, + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream, Iterable>>> = + toPairDStream() + .cogroup( + other.toPairDStream(), + numPartitions, + ) + .toTupleDStream() + + +/** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. + */ +fun JavaDStream>.cogroup( + other: JavaDStream>, + partitioner: Partitioner, +): JavaDStream, Iterable>>> = + toPairDStream() + .cogroup( + other.toPairDStream(), + partitioner, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ +fun JavaDStream>.join( + other: JavaDStream>, + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream>> = + toPairDStream() + .join( + other.toPairDStream(), + numPartitions, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. + */ +fun JavaDStream>.join( + other: JavaDStream>, + partitioner: Partitioner, +): JavaDStream>> = + toPairDStream() + .join( + other.toPairDStream(), + partitioner, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ +fun JavaDStream>.leftOuterJoin( + other: JavaDStream>, + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream>>> = + toPairDStream() + .leftOuterJoin( + other.toPairDStream(), + numPartitions, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ +fun JavaDStream>.leftOuterJoin( + other: JavaDStream>, + partitioner: Partitioner, +): JavaDStream>>> = + toPairDStream() + .leftOuterJoin( + other.toPairDStream(), + partitioner, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ +fun JavaDStream>.rightOuterJoin( + other: JavaDStream>, + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream, W>>> = + toPairDStream() + .rightOuterJoin( + other.toPairDStream(), + numPartitions, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ +fun JavaDStream>.rightOuterJoin( + other: JavaDStream>, + partitioner: Partitioner, +): JavaDStream, W>>> = + toPairDStream() + .rightOuterJoin( + other.toPairDStream(), + partitioner, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ +fun JavaDStream>.fullOuterJoin( + other: JavaDStream>, + numPartitions: Int = dstream().ssc().sc().defaultParallelism(), +): JavaDStream, Optional>>> = + toPairDStream() + .fullOuterJoin( + other.toPairDStream(), + numPartitions, + ) + .toTupleDStream() + +/** + * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control + * the partitioning of each RDD. + */ +fun JavaDStream>.fullOuterJoin( + other: JavaDStream>, + partitioner: Partitioner, +): JavaDStream, Optional>>> = + toPairDStream() + .fullOuterJoin( + other.toPairDStream(), + partitioner, + ) + .toTupleDStream() + +/** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ +fun JavaDStream>.saveAsHadoopFiles( + prefix: String, + suffix: String, +): Unit = toPairDStream().saveAsHadoopFiles(prefix, suffix) + +/** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ +fun JavaDStream>.saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, +): Unit = toPairDStream().saveAsNewAPIHadoopFiles(prefix, suffix) diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt index b4e08216..e95a65f7 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt @@ -20,32 +20,9 @@ package org.jetbrains.kotlinx.spark.api/*- import ch.tutteli.atrium.api.fluent.en_GB.* import ch.tutteli.atrium.api.verbs.expect import io.kotest.core.spec.style.ShouldSpec -import io.kotest.matchers.should import io.kotest.matchers.shouldBe -import org.apache.spark.api.java.JavaDoubleRDD -import org.apache.spark.api.java.JavaPairRDD -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.functions.* -import org.apache.spark.sql.streaming.GroupState -import org.apache.spark.sql.streaming.GroupStateTimeout -import org.apache.spark.sql.types.Decimal -import org.apache.spark.unsafe.types.CalendarInterval -import scala.Product -import scala.Tuple1 -import scala.Tuple2 -import scala.Tuple3 import scala.collection.Seq import java.io.Serializable -import java.math.BigDecimal -import java.sql.Date -import java.sql.Timestamp -import java.time.Duration -import java.time.Instant -import java.time.LocalDate -import java.time.Period import kotlin.collections.Iterator import scala.collection.Iterator as ScalaIterator import scala.collection.Map as ScalaMap diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/KafkaHelper.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/KafkaHelper.kt new file mode 100644 index 00000000..6ec5924c --- /dev/null +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/KafkaHelper.kt @@ -0,0 +1,148 @@ +/*- + * =LICENSE= + * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +@file:Suppress("MemberVisibilityCanBePrivate", "BlockingMethodInNonBlockingContext") + +package org.jetbrains.kotlinx.spark.api + +/** + * Source: https://github.com/kotest/kotest-extensions-embedded-kafka + */ + +import io.github.embeddedkafka.EmbeddedKafka +import io.github.embeddedkafka.EmbeddedKafkaConfig +import io.kotest.core.listeners.TestListener +import io.kotest.core.spec.Spec +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.common.serialization.BytesDeserializer +import org.apache.kafka.common.serialization.BytesSerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.common.utils.Bytes +import scala.Predef +import java.util.* + +val embeddedKafkaListener: EmbeddedKafkaListener = EmbeddedKafkaListener(EmbeddedKafkaConfig.defaultConfig()) + +class EmbeddedKafkaListener( + private val config: EmbeddedKafkaConfig, +) : TestListener { + + constructor(port: Int) : this( + EmbeddedKafkaConfig.apply( + port, + EmbeddedKafkaConfig.defaultZookeeperPort(), + Predef.Map().empty(), + Predef.Map().empty(), + Predef.Map().empty(), + ) + ) + + constructor(kafkaPort: Int, zookeeperPort: Int) : this( + EmbeddedKafkaConfig.apply( + kafkaPort, + zookeeperPort, + Predef.Map().empty(), + Predef.Map().empty(), + Predef.Map().empty(), + ) + ) + + val port: Int = config.kafkaPort() + +// val host: String = "127.0.0.1" +// val host: String = "0.0.0.0" + val host: String = "localhost" + + val bootstrapServer = "$host:$port" + + override suspend fun beforeSpec(spec: Spec) { + EmbeddedKafka.start(config) + while (!EmbeddedKafka.isRunning()) { + Thread.sleep(100) + } + } + + override suspend fun afterSpec(spec: Spec) { + EmbeddedKafka.stop() + while (EmbeddedKafka.isRunning()) { + Thread.sleep(100) + } + } + + /** + * Returns a kafka consumer configured with the details of the embedded broker. + */ + fun stringStringConsumer(configure: Properties.() -> Unit = {}): KafkaConsumer { + val props = Properties() + props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port" + props[ConsumerConfig.GROUP_ID_CONFIG] = "test_consumer_group_" + System.currentTimeMillis() + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + props.configure() + return KafkaConsumer(props, StringDeserializer(), StringDeserializer()) + } + + /** + * Returns a kafka consumer subscribed to the given topic on the embedded broker. + */ + fun stringStringConsumer(topic: String, configure: Properties.() -> Unit = {}): KafkaConsumer { + val consumer = stringStringConsumer(configure) + consumer.subscribe(listOf(topic)) + return consumer + } + + /** + * Returns a kafka consumer configured with the details of the embedded broker. + */ + fun bytesBytesConsumer(configure: Properties.() -> Unit = {}): KafkaConsumer { + val props = Properties() + props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port" + props[ConsumerConfig.GROUP_ID_CONFIG] = "test_consumer_group_" + System.currentTimeMillis() + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + props.configure() + return KafkaConsumer(props, BytesDeserializer(), BytesDeserializer()) + } + + /** + * Returns a kafka consumer subscribed to the given topic on the embedded broker. + */ + fun bytesBytesConsumer(topic: String, configure: Properties.() -> Unit = {}): KafkaConsumer { + val consumer = bytesBytesConsumer(configure) + consumer.subscribe(listOf(topic)) + return consumer + } + + fun bytesBytesProducer(configure: Properties.() -> Unit = {}): KafkaProducer { + val props = Properties() + props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port" + props.configure() + return KafkaProducer(props, BytesSerializer(), BytesSerializer()) + } + + fun stringStringProducer(configure: Properties.() -> Unit = {}): KafkaProducer { + val props = Properties() + props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = "$host:$port" + props.configure() + return KafkaProducer(props, StringSerializer(), StringSerializer()) + } +} + diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/KafkaStreamingTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/KafkaStreamingTest.kt new file mode 100644 index 00000000..b755b1e6 --- /dev/null +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/KafkaStreamingTest.kt @@ -0,0 +1,116 @@ +/*- + * =LICENSE= + * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.api + +import io.kotest.core.Tag +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldBeIn +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.api.java.JavaInputDStream +import org.apache.spark.streaming.kafka010.ConsumerStrategies +import org.apache.spark.streaming.kafka010.KafkaUtils +import org.apache.spark.streaming.kafka010.LocationStrategies +import org.jetbrains.kotlinx.spark.api.tuples.* +import java.io.Serializable + +object Kafka : Tag() + +class KafkaStreamingTest : ShouldSpec({ + + // making sure it can be skipped on Github actions since it times out + tags(Kafka) + + context("kafka") { + val port = 9092 + val broker = "localhost:$port" + val topic1 = "test1" + val topic2 = "test2" + val kafkaListener = EmbeddedKafkaListener(port) + listener(kafkaListener) + + should("support kafka streams") { + val producer = kafkaListener.stringStringProducer() + producer.send(ProducerRecord(topic1, "Hello this is a test test test")) + producer.send(ProducerRecord(topic2, "This is also also a test test something")) + producer.close() + + withSparkStreaming( + batchDuration = Durations.seconds(2), + appName = "KotlinDirectKafkaWordCount", + timeout = 1000L, + ) { + + val kafkaParams: Map = mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to broker, + ConsumerConfig.GROUP_ID_CONFIG to "consumer-group", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ) + + // Create direct kafka stream with brokers and topics + val messages: JavaInputDStream> = KafkaUtils.createDirectStream( + ssc, + LocationStrategies.PreferConsistent(), + ConsumerStrategies.Subscribe(setOf(topic1, topic2), kafkaParams), + ) + + // Get the lines, split them into words, count the words and print + val lines = messages.map { it.topic() X it.value() } + val words = lines.flatMapValues { it.split(" ").iterator() } + + val wordCounts = words + .map { t(it, 1) } + .reduceByKey { a: Int, b: Int -> a + b } + .map { (tup, counter) -> tup + counter } + + val resultLists = mapOf( + topic1 to listOf( + "Hello" X 1, + "this" X 1, + "is" X 1, + "a" X 1, + "test" X 3, + ), + topic2 to listOf( + "This" X 1, + "is" X 1, + "also" X 2, + "a" X 1, + "test" X 2, + "something" X 1, + ) + ) + + wordCounts.foreachRDD { rdd, _ -> + rdd.foreach { (topic, word, count) -> + t(word, count).shouldBeIn(collection = resultLists[topic]!!) + } + } + + wordCounts.print() + } + } + + } +}) diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ProjectConfig.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ProjectConfig.kt index 8516ae62..4238cd78 100644 --- a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ProjectConfig.kt +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ProjectConfig.kt @@ -25,4 +25,6 @@ import io.kotest.extensions.allure.AllureTestReporter @Suppress("unused") object ProjectConfig : AbstractProjectConfig() { override fun listeners() = super.listeners() + AllureTestReporter(true) + + override fun extensions() = super.extensions() + AllureTestReporter(true) } diff --git a/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/StreamingTest.kt b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/StreamingTest.kt new file mode 100644 index 00000000..8ae7f5c2 --- /dev/null +++ b/kotlin-spark-api/3.2/src/test/kotlin/org/jetbrains/kotlinx/spark/api/StreamingTest.kt @@ -0,0 +1,218 @@ +/*- + * =LICENSE= + * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) + * ---------- + * Copyright (C) 2019 - 2022 JetBrains + * ---------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =LICENSEEND= + */ +package org.jetbrains.kotlinx.spark.api + +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldBeIn +import io.kotest.matchers.collections.shouldContainAll +import io.kotest.matchers.shouldBe +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.FileSystem +import org.apache.spark.SparkException +import org.apache.spark.streaming.Checkpoint +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.Durations +import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils +import org.jetbrains.kotlinx.spark.api.tuples.X +import org.jetbrains.kotlinx.spark.api.tuples.component1 +import org.jetbrains.kotlinx.spark.api.tuples.component2 +import org.jetbrains.kotlinx.spark.api.tuples.t +import scala.Tuple2 +import java.io.File +import java.io.Serializable +import java.nio.charset.StandardCharsets +import java.util.* +import java.util.concurrent.atomic.AtomicBoolean + + +class StreamingTest : ShouldSpec({ + + context("streaming") { + + should("stream") { + + val input = listOf("aaa", "bbb", "aaa", "ccc") + val counter = Counter(0) + + withSparkStreaming(Duration(10), timeout = 1000) { + + val (counterBroadcast, queue) = withSpark(ssc) { + spark.broadcast(counter) X LinkedList(listOf(sc.parallelize(input))) + } + + val inputStream = ssc.queueStream(queue) + + inputStream.foreachRDD { rdd, _ -> + withSpark(rdd) { + rdd.toDS().forEach { + it shouldBeIn input + counterBroadcast.value.value++ + } + } + } + } + + counter.value shouldBe input.size + + } + + should("Work with checkpointpath") { + val emptyDir = createTempDir() + val testDirectory = createTempDir() + val corruptedCheckpointDir = createCorruptedCheckpoint() + + val batchDuration = Durations.seconds(1) + val timeout = Durations.seconds(1).milliseconds() + + + val newContextCreated = AtomicBoolean(false) + + val creatingFun: KSparkStreamingSession.() -> Unit = { + println("created new context") + newContextCreated.set(true) + + // closing statement + ssc.textFileStream(testDirectory.absolutePath).foreachRDD { rdd, _ -> rdd.count() } + } + + // fill emptyDir with checkpoint + newContextCreated.set(false) + withSparkStreaming( + batchDuration = batchDuration, + checkpointPath = emptyDir.absolutePath, + props = mapOf("newContext" to true), + timeout = timeout, + func = creatingFun, + ) + newContextCreated.get() shouldBe true + + // check that creatingFun isn't executed when checkpoint is present + newContextCreated.set(false) + withSparkStreaming( + batchDuration = batchDuration, + checkpointPath = emptyDir.absolutePath, + props = mapOf("newContext" to true), + timeout = timeout, + func = creatingFun, + ) + newContextCreated.get() shouldBe false + + // check that creatingFun is not executed when createOnError = false using corrupted checkpoint + newContextCreated.set(false) + shouldThrow { + withSparkStreaming( + batchDuration = batchDuration, + checkpointPath = corruptedCheckpointDir, + props = mapOf("newContext" to true), + timeout = timeout, + func = creatingFun, + createOnError = false, + ) + } + newContextCreated.get() shouldBe false + + // check that creatingFun is executed when createOnError = true using corrupted checkpoint + newContextCreated.set(false) + withSparkStreaming( + batchDuration = batchDuration, + checkpointPath = corruptedCheckpointDir, + props = mapOf("newContext" to true), + timeout = timeout, + func = creatingFun, + createOnError = true, + ) + newContextCreated.get() shouldBe true + } + + should("Have handy tuple2 functions") { + val input = listOf("aaa", "bbb", "aaa", "ccc") + val result = Result() + + withSparkStreaming(Duration(10), timeout = 1000, checkpointPath = createTempDir().absolutePath) { + + val (resultBroadcast, queue) = withSpark(ssc) { + spark.broadcast(result) X LinkedList(listOf(sc.parallelize(input))) + } + + val inputStream = ssc + + .queueStream(queue) // "aaa", "bbb", "aaa", "ccc" + + .map { it X 1 } // ("aaa", 1), ("bbb", 1), ("aaa", 1), ("ccc", 1) + + .reduceByKey(reduceFunc = Int::plus) // ("aaa", 2), ("bbb", 1), ("ccc", 1) + + .flatMapValues { iterator { yield(it); yield(it) } } // ("aaa", 2), ("aaa", 2), ("bbb", 1), ("bbb", 1), ("ccc", 1), ("ccc", 1) + + .groupByKey() // ("aaa", [2, 2]), ("bbb", [1, 1]), ("ccc", [1, 1]) + + .flatMap { (key, values) -> + values.mapIndexed { i, it -> key X it + i }.iterator() + } // ("aaa", 2), ("aaa", 3), ("bbb", 1), ("bbb", 2), ("ccc", 1), ("ccc", 2) + + .combineByKey( + createCombiner = { listOf(it) }, + mergeValue = { list, int -> + list + int + }, + mergeCombiner = { list1, list2 -> + list1 + list2 + }, + ) // ("aaa", [2, 3]), ("bbb", [1, 2]), ("ccc", [1, 2]) + + + // Note: this will update state inside the checkpoint, which we won't test here for now + .updateStateByKey(numPartitions = 3) { lists, s: Int? -> + (s ?: 0) + lists.sumOf { it.sum() } + } // ("aaa", 5), ("bbb", 3), ("ccc", 3) + + inputStream.foreachRDD { rdd, _ -> + withSpark(rdd) { + rdd.toDS().forEach { + it._1 shouldBeIn input + + resultBroadcast.value.list = resultBroadcast.value.list.plusElement(it) + } + } + } + } + + result.list.shouldContainAll(t("aaa", 5), t("bbb", 3), t("ccc", 3)) + } + } +}) + +private fun createTempDir() = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark") + .apply { deleteOnExit() } + +private fun createCorruptedCheckpoint(): String { + val checkpointDirectory = createTempDir().absolutePath + val fakeCheckpointFile = Checkpoint.checkpointFile(checkpointDirectory, Time(1000)) + FileUtils.write(File(fakeCheckpointFile.toString()), "blablabla", StandardCharsets.UTF_8) + assert(Checkpoint.getCheckpointFiles(checkpointDirectory, (null as FileSystem?).toOption()).nonEmpty()) + return checkpointDirectory +} + + +class Counter(@Volatile var value: Int) : Serializable + +class Result(@Volatile var list: List> = listOf()) : Serializable \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0df3adac..8c99ec41 100644 --- a/pom.xml +++ b/pom.xml @@ -10,12 +10,15 @@ pom - 1.5.30 + 1.6.20 1.6.10 - 0.16.0 - 4.6.0 - 1.0.1 + 0.17.0 + 5.2.3 + 1.1.0 + 1.3.1 + 3.1.0 3.2.1 + 3.3.1 2.10.0 diff --git a/scala-tuples-in-kotlin/pom_2.12.xml b/scala-tuples-in-kotlin/pom_2.12.xml index c06aa7ee..cf67af41 100644 --- a/scala-tuples-in-kotlin/pom_2.12.xml +++ b/scala-tuples-in-kotlin/pom_2.12.xml @@ -34,7 +34,7 @@ io.kotest.extensions kotest-extensions-allure - ${kotest-extension-allure.version} + ${kotest-extensions-allure.version} test