7
7
* Licensed under the Apache License, Version 2.0 (the "License");
8
8
* you may not use this file except in compliance with the License.
9
9
* You may obtain a copy of the License at
10
- *
10
+ *
11
11
* http://www.apache.org/licenses/LICENSE-2.0
12
- *
12
+ *
13
13
* Unless required by applicable law or agreed to in writing, software
14
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
20
package org.jetbrains.kotlinx.spark.api
21
21
22
22
import io.kotest.core.Tag
23
- import io.kotest.core.spec.style.ShouldSpec
24
- import io.kotest.matchers.collections.shouldBeIn
23
+ import io.kotest.core.extensions.install
24
+ import io.kotest.core.spec.style.FunSpec
25
+ import io.kotest.extensions.testcontainers.TestContainerExtension
26
+ import io.kotest.extensions.testcontainers.kafka.createStringStringConsumer
27
+ import io.kotest.extensions.testcontainers.kafka.createStringStringProducer
28
+ import io.kotest.matchers.collections.shouldContain
29
+ import io.kotest.matchers.collections.shouldContainAll
25
30
import org.apache.kafka.clients.consumer.ConsumerConfig
26
31
import org.apache.kafka.clients.consumer.ConsumerRecord
27
32
import org.apache.kafka.clients.producer.ProducerRecord
@@ -32,85 +37,94 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies
32
37
import org.apache.spark.streaming.kafka010.KafkaUtils
33
38
import org.apache.spark.streaming.kafka010.LocationStrategies
34
39
import org.jetbrains.kotlinx.spark.api.tuples.*
40
+ import org.testcontainers.containers.KafkaContainer
41
+ import org.testcontainers.utility.DockerImageName
42
+ import scala.Tuple3
35
43
import java.io.Serializable
44
+ import java.time.Duration
36
45
37
46
object Kafka : Tag()
38
47
39
- class KafkaStreamingTest : ShouldSpec ({
48
+ class KafkaStreamingTest : FunSpec () {
49
+ init {
40
50
41
- // making sure it can be skipped on Github actions since it times out
42
- tags(Kafka )
51
+ tags(Kafka )
43
52
44
- xcontext("kafka") {
45
- val port = 9092
46
- val broker = " localhost:$port "
47
- val topic1 = " test1"
48
- val topic2 = " test2"
49
- val kafkaListener = EmbeddedKafkaListener (port)
50
- listener(kafkaListener)
53
+ val kafka = install(
54
+ TestContainerExtension (KafkaContainer (DockerImageName .parse(" confluentinc/cp-kafka:7.0.1" )))
55
+ ) {
56
+ withEmbeddedZookeeper()
57
+ withEnv(" KAFKA_AUTO_CREATE_TOPICS_ENABLE" , " true" )
58
+ }
59
+ println (kafka.bootstrapServers)
60
+ test(" Streaming should support kafka" ) {
61
+ val topic1 = " test1"
62
+ val topic2 = " test2"
51
63
52
- should("support kafka streams") {
53
- val producer = kafkaListener.stringStringProducer()
54
- producer.send(ProducerRecord (topic1, "Hello this is a test test test"))
55
- producer.send(ProducerRecord (topic2, "This is also also a test test something"))
56
- producer.close()
64
+ val resultLists = mapOf (
65
+ topic1 to listOf (
66
+ " Hello" X 1 ,
67
+ " this" X 1 ,
68
+ " is" X 1 ,
69
+ " a" X 1 ,
70
+ " test" X 3 ,
71
+ ),
72
+ topic2 to listOf (
73
+ " This" X 1 ,
74
+ " is" X 1 ,
75
+ " also" X 2 ,
76
+ " a" X 1 ,
77
+ " test" X 2 ,
78
+ " something" X 1 ,
79
+ )
80
+ )
81
+ val data = arrayListOf<List <Tuple3 <String , String , Int >>>()
57
82
58
83
withSparkStreaming(
59
- batchDuration = Durations .seconds(2 ),
84
+ batchDuration = Durations .milliseconds( 1000 ),
60
85
appName = " KotlinDirectKafkaWordCount" ,
61
- timeout = 1000L,
86
+ timeout = 10_000L ,
87
+ master = " local"
62
88
) {
63
89
90
+ setRunAfterStart {
91
+ val producer = autoClose(kafka.createStringStringProducer())
92
+ producer.send(ProducerRecord (topic1, " Hello this is a test test test" ))
93
+ producer.send(ProducerRecord (topic2, " This is also also a test test something" ))
94
+ }
95
+
64
96
val kafkaParams: Map <String , Serializable > = mapOf (
65
- ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to broker ,
97
+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG to " ${kafka.host} : ${kafka.getMappedPort( KafkaContainer . KAFKA_PORT )} " ,
66
98
ConsumerConfig .GROUP_ID_CONFIG to " consumer-group" ,
67
99
ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
68
100
ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer ::class .java,
69
101
)
70
-
71
102
// Create direct kafka stream with brokers and topics
72
103
val messages: JavaInputDStream <ConsumerRecord <String , String >> = KafkaUtils .createDirectStream(
73
104
ssc,
74
- LocationStrategies .PreferConsistent (),
105
+ LocationStrategies .PreferBrokers (),
75
106
ConsumerStrategies .Subscribe (setOf (topic1, topic2), kafkaParams),
76
107
)
77
108
78
109
// Get the lines, split them into words, count the words and print
79
- val lines = messages.map { it.topic() X it.value() }
80
- val words = lines.flatMapValues { it.split(" ").iterator() }
81
110
82
- val wordCounts = words
111
+ val wordCounts = messages
112
+ .map { it.topic() X it.value() }
113
+ .flatMapValues { it.split(" " ).iterator() }
83
114
.map { t(it, 1 ) }
84
115
.reduceByKey { a: Int , b: Int -> a + b }
85
116
.map { (tup, counter) -> tup + counter }
86
117
87
- val resultLists = mapOf(
88
- topic1 to listOf(
89
- "Hello " X 1,
90
- "this" X 1,
91
- "is" X 1,
92
- "a" X 1,
93
- "test" X 3,
94
- ),
95
- topic2 to listOf(
96
- "This " X 1,
97
- "is" X 1,
98
- "also" X 2,
99
- "a" X 1,
100
- "test" X 2,
101
- "something" X 1,
102
- )
103
- )
104
118
105
119
wordCounts.foreachRDD { rdd, _ ->
106
- rdd.foreach { (topic, word, count) ->
107
- t(word, count).shouldBeIn(collection = resultLists[topic]!!)
108
- }
120
+ data.add(rdd.collect())
109
121
}
122
+ }
110
123
111
- wordCounts.print()
124
+ val resultList = resultLists.flatMap { (topic, tuples) ->
125
+ tuples.map { it.prependedBy(topic) }
112
126
}
127
+ data.flatten() shouldContainAll resultList
113
128
}
114
-
115
129
}
116
- })
130
+ }
0 commit comments