Skip to content

Commit 72fb5ea

Browse files
Jolanrensenasm0dey
authored andcommitted
fix: adds reduceK function to avoid resolution ambiguity for reduce
1 parent 8e7523a commit 72fb5ea

File tree

6 files changed

+123
-108
lines changed
  • examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples
  • kotlin-spark-api
    • 2.4/src
      • main/kotlin/org/jetbrains/kotlinx/spark/api
      • test/kotlin/org/jetbrains/kotlinx/spark/api
    • 3.0/src
      • main/kotlin/org/jetbrains/kotlinx/spark/api
      • test/kotlin/org/jetbrains/kotlinx/spark/api

6 files changed

+123
-108
lines changed

README.md

Lines changed: 72 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.1) [![official JetBrains project](http://jb.gg/badges/incubator.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub)
22

3-
Your next API to work with [Apache Spark](https://spark.apache.org/).
43

5-
This project adds a missing layer of compatibility between [Kotlin](https://kotlinlang.org/)
6-
and [Apache Spark](https://spark.apache.org/). It allows Kotlin developers to use familiar language features such as
7-
data classes, and lambda expressions as simple expressions in curly braces or method references.
4+
Your next API to work with [Apache Spark](https://spark.apache.org/).
85

9-
We have opened a Spark Project Improvement
10-
Proposal: [Kotlin support for Apache Spark](http://issues.apache.org/jira/browse/SPARK-32530#) to work with the
11-
community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your
12-
opinions and participate in the discussion.
6+
This project adds a missing layer of compatibility between [Kotlin](https://kotlinlang.org/) and [Apache Spark](https://spark.apache.org/).
7+
It allows Kotlin developers to use familiar language features such as data classes, and lambda expressions as simple expressions in curly braces or method references.
8+
9+
We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache Spark](http://issues.apache.org/jira/browse/SPARK-32530#) to work with the community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your opinions and participate in the discussion.
1310

1411
## Table of Contents
1512

@@ -24,7 +21,7 @@ opinions and participate in the discussion.
2421
- [withCached function](#withcached-function)
2522
- [toList and toArray](#tolist-and-toarray-methods)
2623
- [Column infix/operator functions](#column-infixoperator-functions)
27-
- [`reduceGroups`](#reducegroups)
24+
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
2825
- [Examples](#examples)
2926
- [Reporting issues/Support](#reporting-issuessupport)
3027
- [Code of Conduct](#code-of-conduct)
@@ -40,145 +37,133 @@ opinions and participate in the discussion.
4037

4138
## Releases
4239

43-
The list of Kotlin for Apache Spark releases is
44-
available [here](https://github.com/JetBrains/kotlin-spark-api/releases/). The Kotlin for Spark artifacts adhere to the
45-
following convention:
46-
`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]`
40+
The list of Kotlin for Apache Spark releases is available [here](https://github.com/JetBrains/kotlin-spark-api/releases/).
41+
The Kotlin for Spark artifacts adhere to the following convention:
42+
`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]`
4743

4844
[![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22org.jetbrains.kotlinx.spark%22%20AND%20a:%22kotlin-spark-api-3.0.0_2.12%22)
4945

5046
## How to configure Kotlin for Apache Spark in your project
5147

52-
You can add Kotlin for Apache Spark as a dependency to your project: `Maven`, `Gradle`, `SBT`, and `leinengen` are
53-
supported.
54-
48+
You can add Kotlin for Apache Spark as a dependency to your project: `Maven`, `Gradle`, `SBT`, and `leinengen` are supported.
49+
5550
Here's an example `pom.xml`:
5651

5752
```xml
58-
5953
<dependency>
60-
<groupId>org.jetbrains.kotlinx.spark</groupId>
61-
<artifactId>kotlin-spark-api-3.0.0</artifactId>
62-
<version>${kotlin-spark-api.version}</version>
54+
<groupId>org.jetbrains.kotlinx.spark</groupId>
55+
<artifactId>kotlin-spark-api-3.0.0</artifactId>
56+
<version>${kotlin-spark-api.version}</version>
6357
</dependency>
6458
<dependency>
65-
<groupId>org.apache.spark</groupId>
66-
<artifactId>spark-sql_2.12</artifactId>
67-
<version>${spark.version}</version>
59+
<groupId>org.apache.spark</groupId>
60+
<artifactId>spark-sql_2.12</artifactId>
61+
<version>${spark.version}</version>
6862
</dependency>
6963
```
7064

7165
Note that `core` is being compiled against Scala version `2.12`.
72-
You can find a complete example with `pom.xml` and `build.gradle` in
73-
the [Quick Start Guide](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
74-
75-
Once you have configured the dependency, you only need to add the following import to your Kotlin file:
66+
You can find a complete example with `pom.xml` and `build.gradle` in the [Quick Start Guide](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
7667

68+
Once you have configured the dependency, you only need to add the following import to your Kotlin file:
7769
```kotlin
7870
import org.jetbrains.kotlinx.spark.api.*
7971
```
8072

8173
## Kotlin for Apache Spark features
8274

8375
### Creating a SparkSession in Kotlin
84-
8576
```kotlin
8677
val spark = SparkSession
87-
.builder()
88-
.master("local[2]")
89-
.appName("Simple Application").orCreate
78+
.builder()
79+
.master("local[2]")
80+
.appName("Simple Application").orCreate
9081

9182
```
9283

9384
### Creating a Dataset in Kotlin
94-
9585
```kotlin
9686
spark.toDS("a" to 1, "b" to 2)
9787
```
98-
9988
The example above produces `Dataset<Pair<String, Int>>`.
100-
89+
10190
### Null safety
102-
103-
There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design. For
104-
example, `leftJoin` is aware of nullability and returns `Dataset<Pair<LEFT, RIGHT?>>`. Note that we are forcing `RIGHT`
105-
to be nullable for you as a developer to be able to handle this situation.
91+
There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design.
92+
For example, `leftJoin` is aware of nullability and returns `Dataset<Pair<LEFT, RIGHT?>>`.
93+
Note that we are forcing `RIGHT` to be nullable for you as a developer to be able to handle this situation.
10694
`NullPointerException`s are hard to debug in Spark, and we doing our best to make them as rare as possible.
10795

10896
### withSpark function
10997

110-
We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties,
111-
name, master location and so on. It also accepts a block of code to execute inside Spark context.
98+
We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties, name, master location and so on. It also accepts a block of code to execute inside Spark context.
11299

113100
After work block ends, `spark.stop()` is called automatically.
114101

115102
```kotlin
116103
withSpark {
117104
dsOf(1, 2)
118-
.map { it to it }
119-
.show()
105+
.map { it to it }
106+
.show()
120107
}
121108
```
122109

123110
`dsOf` is just one more way to create `Dataset` (`Dataset<Int>`) from varargs.
124111

125-
### `withCached` function
126-
127-
It can easily happen that we need to fork our computation to several paths. To compute things only once we should
128-
call `cache`
129-
method. However, it becomes difficult to control when we're using cached `Dataset` and when not. It is also easy to
130-
forget to unpersist cached data, which can break things unexpectedly or take up more memory than intended.
112+
### withCached function
113+
It can easily happen that we need to fork our computation to several paths. To compute things only once we should call `cache`
114+
method. However, it becomes difficult to control when we're using cached `Dataset` and when not.
115+
It is also easy to forget to unpersist cached data, which can break things unexpectedly or take up more memory
116+
than intended.
131117

132118
To solve these problems we've added `withCached` function
133119

134120
```kotlin
135121
withSpark {
136122
dsOf(1, 2, 3, 4, 5)
137-
.map { it to (it + 2) }
138-
.withCached {
139-
showDS()
140-
141-
filter { it.first % 2 == 0 }.showDS()
142-
}
143-
.map { c(it.first, it.second, (it.first + it.second) * 2) }
144-
.show()
123+
.map { it to (it + 2) }
124+
.withCached {
125+
showDS()
126+
127+
filter { it.first % 2 == 0 }.showDS()
128+
}
129+
.map { c(it.first, it.second, (it.first + it.second) * 2) }
130+
.show()
145131
}
146132
```
147133

148-
Here we're showing cached `Dataset` for debugging purposes then filtering it. The `filter` method returns
149-
filtered `Dataset` and then the cached `Dataset` is being unpersisted, so we have more memory t o call the `map` method
150-
and collect the resulting `Dataset`.
134+
Here we're showing cached `Dataset` for debugging purposes then filtering it.
135+
The `filter` method returns filtered `Dataset` and then the cached `Dataset` is being unpersisted, so we have more memory t
136+
o call the `map` method and collect the resulting `Dataset`.
151137

152-
### `toList` and `toArray` methods
138+
### toList and toArray methods
153139

154-
For more idiomatic Kotlin code we've added `toList` and `toArray` methods in this API. You can still use the `collect`
155-
method as in Scala API, however the result should be casted to `Array`. This is because `collect` returns a Scala array,
156-
which is not the same as Java/Kotlin one.
140+
For more idiomatic Kotlin code we've added `toList` and `toArray` methods in this API. You can still use the `collect` method as in Scala API, however the result should be casted to `Array`.
141+
This is because `collect` returns a Scala array, which is not the same as Java/Kotlin one.
157142

158143
### Column infix/operator functions
159144

160-
Similar to the Scala API for `Columns`, many of the operator functions could be ported over. For example:
161-
145+
Similar to the Scala API for `Columns`, many of the operator functions could be ported over.
146+
For example:
162147
```kotlin
163-
dataset.select(col("colA") + 5)
164-
dataset.select(col("colA") / col("colB"))
148+
dataset.select( col("colA") + 5 )
149+
dataset.select( col("colA") / col("colB") )
165150

166-
dataset.where(col("colA") `===` 6)
151+
dataset.where( col("colA") `===` 6 )
167152
// or alternatively
168-
dataset.where(col("colA") eq 6)
153+
dataset.where( col("colA") eq 6)
169154
```
170155

171156
In short, all supported operators are:
172157

173158
- `==`,
174-
- `!=`,
159+
- `!=`,
175160
- `eq` / `` `===` ``,
176161
- `neq` / `` `=!=` ``,
177162
- `-col(...)`,
178-
- `!col(...)`,
163+
- `!col(...)`,
179164
- `gt`,
180165
- `lt`,
181-
- `geq`,
166+
- `geq`,
182167
- `leq`,
183168
- `or`,
184169
- `and` / `` `&&` ``,
@@ -190,53 +175,43 @@ In short, all supported operators are:
190175

191176
Secondly, there are some quality of life additions as well:
192177

193-
In Kotlin, Ranges are often used to solve inclusive/exclusive situations for a range. So, you can now do:
194-
178+
In Kotlin, Ranges are often
179+
used to solve inclusive/exclusive situations for a range. So, you can now do:
195180
```kotlin
196-
dataset.where(col("colA") inRangeOf 0..2)
181+
dataset.where( col("colA") inRangeOf 0..2 )
197182
```
198183

199184
Also, for columns containing map- or array like types:
200185

201186
```kotlin
202-
dataset.where(col("colB")[0] geq 5)
187+
dataset.where( col("colB")[0] geq 5 )
203188
```
204189

205-
Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way to create `TypedColumn`s and with
206-
those a new Dataset from pieces of another using the `selectTyped()` function, added to the API:
207-
190+
Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way
191+
to create `TypedColumn`s and with those a new Dataset from pieces of another using the `selectTyped()` function, added to the API:
208192
```kotlin
209193
val dataset: Dataset<YourClass> = ...
210194
val newDataset: Dataset<Pair<TypeA, TypeB>> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB))
211195
```
212196

213-
### `reduceGroups`
197+
### Overload resolution ambiguity
198+
199+
We had to implement the functions `reduceGroups` and `reduce` for Kotlin separately as `reduceGroupsK` and `reduceK` respectively, because otherwise it caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve.
214200

215-
We had to implemet `reduceGroups` operator for Kotlin separately as `reduceGroupsK` function, because otherwise it
216-
caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve.
201+
We have a special example of work with this function in the [Groups example](https://github.com/JetBrains/kotlin-spark-api/edit/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt).
217202

218-
We have a special example of work with this function in
219-
the [Groups example](https://github.com/JetBrains/kotlin-spark-api/edit/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt)
220-
.
221203

222204
## Examples
223205

224-
For more, check
225-
out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples)
226-
module. To get up and running quickly, check out
227-
this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
206+
For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
207+
To get up and running quickly, check out this [tutorial](https://github.com/JetBrains/kotlin-spark-api/wiki/Quick-Start-Guide).
228208

229209
## Reporting issues/Support
230-
231-
Please use [GitHub issues](https://github.com/JetBrains/kotlin-spark-api/issues) for filing feature requests and bug
232-
reports. You are also welcome to join [kotlin-spark channel](https://kotlinlang.slack.com/archives/C015B9ZRGJF) in the
233-
Kotlin Slack.
210+
Please use [GitHub issues](https://github.com/JetBrains/kotlin-spark-api/issues) for filing feature requests and bug reports.
211+
You are also welcome to join [kotlin-spark channel](https://kotlinlang.slack.com/archives/C015B9ZRGJF) in the Kotlin Slack.
234212

235213
## Code of Conduct
236-
237-
This project and the corresponding community is governed by
238-
the [JetBrains Open Source and Community Code of Conduct](https://confluence.jetbrains.com/display/ALL/JetBrains+Open+Source+and+Community+Code+of+Conduct)
239-
. Please make sure you read it.
214+
This project and the corresponding community is governed by the [JetBrains Open Source and Community Code of Conduct](https://confluence.jetbrains.com/display/ALL/JetBrains+Open+Source+and+Community+Code+of+Conduct). Please make sure you read it.
240215

241216
## License
242217

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ object Main {
4848
// .also { it.printSchema() }
4949
.map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) }
5050
.groupByKey { it.a }
51-
.reduceGroups(ReduceFunction { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) })
52-
.map { it._2 }
51+
.reduceGroupsK { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) }
52+
.map { it.second }
5353
.repartition(1)
5454
.withCached {
5555
write()

kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,13 @@ inline fun <reified KEY, reified VALUE> KeyValueGroupedDataset<KEY, VALUE>.reduc
217217
reduceGroups(ReduceFunction(func))
218218
.map { t -> t._1 to t._2 }
219219

220+
/**
221+
* (Kotlin-specific)
222+
* Reduces the elements of this Dataset using the specified binary function. The given `func`
223+
* must be commutative and associative or the result may be non-deterministic.
224+
*/
225+
inline fun <reified T> Dataset<T>.reduceK(noinline func: (T, T) -> T): T =
226+
reduce(ReduceFunction(func))
220227

221228
@JvmName("takeKeysTuple2")
222229
inline fun <reified T1, T2> Dataset<Tuple2<T1, T2>>.takeKeys(): Dataset<T1> = map { it._1() }

kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,21 @@ import ch.tutteli.atrium.api.fluent.en_GB.*
2121
import ch.tutteli.atrium.api.verbs.expect
2222
import io.kotest.core.spec.style.ShouldSpec
2323
import io.kotest.matchers.shouldBe
24-
import org.apache.spark.sql.streaming.GroupState
25-
import org.apache.spark.sql.streaming.GroupStateTimeout
26-
import scala.collection.Seq
2724
import org.apache.spark.sql.Dataset
2825
import org.apache.spark.sql.TypedColumn
2926
import org.apache.spark.sql.functions.*
27+
import org.apache.spark.sql.streaming.GroupState
28+
import org.apache.spark.sql.streaming.GroupStateTimeout
3029
import scala.Product
3130
import scala.Tuple1
3231
import scala.Tuple2
3332
import scala.Tuple3
33+
import scala.collection.Seq
3434
import java.io.Serializable
3535
import java.sql.Date
3636
import java.sql.Timestamp
3737
import java.time.LocalDate
38+
import kotlin.collections.Iterator
3839
import scala.collection.Iterator as ScalaIterator
3940
import scala.collection.Map as ScalaMap
4041
import scala.collection.mutable.Map as ScalaMutableMap
@@ -457,7 +458,7 @@ class ApiTest : ShouldSpec({
457458
SomeClass(intArrayOf(4, 3, 2), 1),
458459
)
459460
.groupByKey { it.b }
460-
.reduceGroupsK(func = { a, b -> SomeClass(a.a + b.a, a.b) })
461+
.reduceGroupsK { a, b -> SomeClass(a.a + b.a, a.b) }
461462
.takeValues()
462463

463464
dataset.count() shouldBe 1
@@ -473,6 +474,18 @@ class ApiTest : ShouldSpec({
473474
dataset.sort(SomeClass::a, SomeClass::b)
474475
dataset.takeAsList(1).first().b shouldBe 2
475476
}
477+
should("Have Kotlin ready functions in place of overload ambiguity") {
478+
val dataset: Pair<Int, SomeClass> = dsOf(
479+
SomeClass(intArrayOf(1, 2, 3), 1),
480+
SomeClass(intArrayOf(4, 3, 2), 1),
481+
)
482+
.groupByKey { it: SomeClass -> it.b }
483+
.reduceGroupsK { v1: SomeClass, v2: SomeClass -> v1 }
484+
.filter { it: Pair<Int, SomeClass> -> true } // not sure why this does work, but reduce doesn't
485+
.reduceK { v1: Pair<Int, SomeClass>, v2: Pair<Int, SomeClass> -> v1 }
486+
487+
dataset.second.a shouldBe intArrayOf(1, 2, 3)
488+
}
476489
should("Generate encoder correctly with complex enum data class") {
477490
val dataset: Dataset<ComplexEnumDataClass> =
478491
dsOf(
@@ -495,7 +508,7 @@ class ApiTest : ShouldSpec({
495508

496509
first.int shouldBe 1
497510
first.string shouldBe "string"
498-
first.strings shouldBe listOf("1","2")
511+
first.strings shouldBe listOf("1", "2")
499512
first.someEnum shouldBe SomeEnum.A
500513
first.someOtherEnum shouldBe SomeOtherEnum.C
501514
first.someEnums shouldBe listOf(SomeEnum.A, SomeEnum.B)
@@ -551,5 +564,5 @@ data class ComplexEnumDataClass(
551564
val someOtherEnums: List<SomeOtherEnum>,
552565
val someEnumArray: Array<SomeEnum>,
553566
val someOtherArray: Array<SomeOtherEnum>,
554-
val enumMap: Map<SomeEnum, SomeOtherEnum>
567+
val enumMap: Map<SomeEnum, SomeOtherEnum>,
555568
)

0 commit comments

Comments
 (0)