Skip to content

Commit d0ae7fb

Browse files
authored
Merge pull request #144 from JetBrains/tuple-first
Tuple first
2 parents 405ed8e + d79d61a commit d0ae7fb

File tree

37 files changed

+6980
-178
lines changed

37 files changed

+6980
-178
lines changed

README.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
2222
- [toList and toArray](#tolist-and-toarray-methods)
2323
- [Column infix/operator functions](#column-infixoperator-functions)
2424
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
25+
- [Tuples](#tuples)
2526
- [Examples](#examples)
2627
- [Reporting issues/Support](#reporting-issuessupport)
2728
- [Code of Conduct](#code-of-conduct)
@@ -204,6 +205,67 @@ We had to implement the functions `reduceGroups` and `reduce` for Kotlin separat
204205

205206
We have a special example of work with this function in the [Groups example](https://github.com/JetBrains/kotlin-spark-api/blob/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt).
206207

208+
### Tuples
209+
210+
Inspired by [ScalaTuplesInKotlin](https://github.com/Jolanrensen/ScalaTuplesInKotlin), the API introduces a lot of helper- extension functions
211+
to make working with Scala Tuples a breeze in your Kotlin Spark projects. While working with data classes is encouraged,
212+
for pair-like Datasets / RDDs / DStreams Scala Tuples are recommended, both for the useful helper functions, as well as Spark performance.
213+
To enable these features
214+
simply add
215+
```kotlin
216+
import org.jetbrains.kotlinx.spark.api.tuples.*
217+
```
218+
to the start of your file.
219+
220+
Tuple creation can be done in the following manners:
221+
```kotlin
222+
val a: Tuple2<Int, Long> = tupleOf(1, 2L)
223+
val b: Tuple3<String, Double, Int> = t("test", 1.0, 2)
224+
val c: Tuple3<Float, String, Int> = 5f X "aaa" X 1
225+
```
226+
Tuples can be expanded and merged like this:
227+
```kotlin
228+
// expand
229+
tupleOf(1, 2).appendedBy(3) == tupleOf(1, 2, 3)
230+
tupleOf(1, 2) + 3 == tupleOf(1, 2, 3)
231+
tupleOf(2, 3).prependedBy(1) == tupleOf(1, 2, 3)
232+
1 + tupleOf(2, 3) == tupleOf(1, 2, 3)
233+
234+
// merge
235+
tupleOf(1, 2) concat tupleOf(3, 4) == tupleOf(1, 2, 3, 4)
236+
tupleOf(1, 2) + tupleOf(3, 4) == tupleOf(1, 2, 3, 4)
237+
238+
// extend tuple instead of merging with it
239+
tupleOf(1, 2).appendedBy(tupleOf(3, 4)) == tupleOf(1, 2, tupleOf(3, 4))
240+
tupleOf(1, 2) + tupleOf(tupleOf(3, 4)) == tupleOf(1, 2, tupleOf(3, 4))
241+
```
242+
243+
The concept of `EmptyTuple` from Scala 3 is also already present:
244+
```kotlin
245+
tupleOf(1).dropLast() == tupleOf() == emptyTuple()
246+
```
247+
248+
Finally, all these tuple helper functions are also baked in:
249+
250+
- `componentX()` for destructuring: `val (a, b) = tuple`
251+
- `dropLast() / dropFirst()`
252+
- `contains(x)` for `if (x in tuple) { ... }`
253+
- `iterator()` for `for (x in tuple) { ... }`
254+
- `asIterable()`
255+
- `size`
256+
- `get(n) / get(i..j)` for `tuple[1] / tuple[i..j]`
257+
- `getOrNull(n) / getOrNull(i..j)`
258+
- `getAs<T>(n) / getAs<T>(i..j)`
259+
- `getAsOrNull<T>(n) / getAsOrNull<T>(i..j)`
260+
- `copy(_1 = ..., _5 = ...)`
261+
- `first() / last()`
262+
- `_1`, `_6` etc. (instead of `_1()`, `_6()`)
263+
- `zip`
264+
- `dropN() / dropLastN()`
265+
- `takeN() / takeLastN()`
266+
- `splitAtN()`
267+
- `map`
268+
- `cast`
207269

208270
## Examples
209271

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/CachedOperations.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,18 @@
2020
package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.jetbrains.kotlinx.spark.api.*
23+
import org.jetbrains.kotlinx.spark.api.tuples.*
2324

2425
fun main() {
2526
withSpark {
2627
dsOf(1, 2, 3, 4, 5)
27-
.map { it to (it + 2) }
28+
.map { it X (it + 2) }
2829
.withCached {
2930
showDS()
3031

31-
filter { it.first % 2 == 0 }.showDS()
32+
filter { it._1 % 2 == 0 }.showDS()
3233
}
33-
.map { c(it.first, it.second, (it.first + it.second) * 2) }
34+
.map { it.appendedBy(it._1 + it._2 * 2) }
3435
.show()
3536
}
3637
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Collect.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.apache.spark.sql.Row
2323
import org.jetbrains.kotlinx.spark.api.*
24+
import org.jetbrains.kotlinx.spark.api.tuples.*
2425

2526
fun main() {
2627
withSpark {
@@ -39,7 +40,7 @@ fun main() {
3940
}
4041

4142
dsOf(1, 2, 3)
42-
.map { c(it, it + 1, it + 2) }
43+
.map { t(it, it + 1, it + 2) }
4344
.to<Row>()
4445
.select("_1")
4546
.collectAsList()

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,19 @@
2020
package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.jetbrains.kotlinx.spark.api.*
23+
import org.jetbrains.kotlinx.spark.api.tuples.*
2324

2425
fun main() {
2526
withSpark {
26-
dsOf(c(1, "a"), c(1, "b"), c(2, "c"))
27+
dsOf(
28+
1 X "a",
29+
1 X "b",
30+
2 X "c",
31+
)
2732
.groupByKey { it._1 }
28-
.reduceGroupsK { a, b -> c(a._1 + b._1, a._2 + b._2) }
33+
.reduceGroupsK { a, b ->
34+
tupleOf(a._1 + b._1, a._2 + b._2)
35+
}
2936
.show()
3037
}
3138
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Join.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.jetbrains.kotlinx.spark.api.*
23+
import org.jetbrains.kotlinx.spark.api.tuples.*
2324

2425

2526
data class Left(val id: Int, val name: String)
@@ -32,10 +33,12 @@ fun main() {
3233
val first = dsOf(Left(1, "a"), Left(2, "b"))
3334
val second = dsOf(Right(1, 100), Right(3, 300))
3435
first
35-
.leftJoin(second, first.col("id").eq(second.col("id")))
36+
.leftJoin(second, first.col("id") eq second.col("id"))
3637
.debugCodegen()
3738
.also { it.show() }
38-
.map { c(it.first.id, it.first.name, it.second?.value) }
39+
.map { (left, right) ->
40+
left.id X left.name X right?.value
41+
}
3942
.show()
4043

4144
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,53 @@
2020
package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.apache.spark.api.java.function.ReduceFunction
23+
import org.apache.spark.sql.Dataset
2324
import org.jetbrains.kotlinx.spark.api.*
25+
import org.jetbrains.kotlinx.spark.api.tuples.*
26+
import scala.*
2427

2528
data class Q<T>(val id: Int, val text: T)
29+
@Suppress("RedundantLambdaArrow", "UsePropertyAccessSyntax")
2630
object Main {
2731

2832
@JvmStatic
2933
fun main(args: Array<String>) {
30-
3134
val spark = SparkSession
3235
.builder()
3336
.master("local[2]")
34-
.appName("Simple Application").orCreate
35-
36-
val triples = spark
37-
.toDS(listOf(Q(1, 1 to null), Q(2, 2 to "22"), Q(3, 3 to "333")))
38-
.map { (a, b) -> a + b.first to b.second?.length }
39-
.map { it to 1 }
40-
.map { (a, b) -> Triple(a.first, a.second, b) }
37+
.appName("Simple Application")
38+
.getOrCreate()
4139

40+
val triples: Dataset<Tuple3<Int, Int?, Int>> = spark
41+
.toDS(
42+
listOf(
43+
Q(1, 1 X null),
44+
Q(2, 2 X "22"),
45+
Q(3, 3 X "333"),
46+
)
47+
)
48+
.map { (a, b) -> t(a + b._1, b._2?.length) }
49+
.map { it: Tuple2<Int, Int?> -> it + 1 } // add counter
4250

4351
val pairs = spark
44-
.toDS(listOf(2 to "hell", 4 to "moon", 6 to "berry"))
52+
.toDS(
53+
listOf(
54+
2 X "hell",
55+
4 X "moon",
56+
6 X "berry",
57+
)
58+
)
4559

4660
triples
47-
.leftJoin(pairs, triples.col("first").multiply(2).eq(pairs.col("first")))
61+
.leftJoin(
62+
right = pairs,
63+
col = triples("_1").multiply(2) eq pairs("_1"),
64+
)
4865
// .also { it.printSchema() }
49-
.map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) }
66+
.map { (triple, pair) -> Five(triple._1, triple._2, triple._3, pair?._1, pair?._2) }
5067
.groupByKey { it.a }
5168
.reduceGroupsK { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) }
52-
.map { it.second }
69+
.map { it._2 }
5370
.repartition(1)
5471
.withCached {
5572
write()

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/MapAndListOperations.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@
2020
package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.jetbrains.kotlinx.spark.api.*
23+
import org.jetbrains.kotlinx.spark.api.tuples.*
2324

2425
fun main() {
2526
withSpark(props = mapOf("spark.sql.codegen.wholeStage" to true)) {
2627
dsOf(
27-
mapOf(1 to c(1, 2, 3), 2 to c(1, 2, 3)),
28-
mapOf(3 to c(1, 2, 3), 4 to c(1, 2, 3)),
28+
mapOf(1 to t(1, 2, 3), 2 to t(1, 2, 3)),
29+
mapOf(3 to t(1, 2, 3), 4 to t(1, 2, 3)),
2930
)
30-
.flatMap { it.toList().map { p -> listOf(p.first, p.second._1, p.second._2, p.second._3) }.iterator() }
31+
.flatMap {
32+
it.toList()
33+
.map { (first, tuple) -> (first + tuple).toList() }
34+
.iterator()
35+
}
3136
.flatten()
32-
.map { c(it) }
37+
.map { tupleOf(it) }
3338
.also { it.printSchema() }
3439
.distinct()
3540
.sort("_1")

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/WordCount.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.examples
2121

2222
import org.apache.spark.sql.Dataset
2323
import org.jetbrains.kotlinx.spark.api.*
24+
import org.jetbrains.kotlinx.spark.api.tuples.*
2425

2526
const val MEANINGFUL_WORD_LENGTH = 4
2627

@@ -33,15 +34,15 @@ fun main() {
3334
.flatten()
3435
.cleanup()
3536
.groupByKey { it }
36-
.mapGroups { k, iter -> k to iter.asSequence().count() }
37-
.sort { arrayOf(it.col("second").desc()) }
37+
.mapGroups { k, iter -> k X iter.asSequence().count() }
38+
.sort { arrayOf(it(colName = "_2").desc()) }
3839
.limit(20)
39-
.map { it.second to it.first }
40+
.map { it.swap() }
4041
.show(false)
4142
}
4243
}
4344

44-
fun Dataset<String>.cleanup() =
45+
fun Dataset<String>.cleanup(): Dataset<String> =
4546
filter { it.isNotBlank() }
4647
.map { it.trim(',', ' ', '\n', ':', '.', ';', '?', '!', '"', '\'', '\t', ' ') }
4748
.filter { !it.endsWith("n’t") }

kotlin-spark-api/3.2/pom_2.12.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
<groupId>org.jetbrains.kotlinx.spark</groupId>
2828
<artifactId>core-3.2_${scala.compat.version}</artifactId>
2929
</dependency>
30+
<dependency>
31+
<groupId>org.jetbrains.kotlinx.spark</groupId>
32+
<artifactId>scala-tuples-in-kotlin</artifactId>
33+
</dependency>
3034

3135

3236
<!-- Provided dependencies -->
@@ -75,6 +79,7 @@
7579
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
7680
<directory>target/${scala.compat.version}</directory>
7781
<plugins>
82+
7883
<plugin>
7984
<groupId>org.jetbrains.kotlin</groupId>
8085
<artifactId>kotlin-maven-plugin</artifactId>
@@ -93,10 +98,12 @@
9398
</execution>
9499
</executions>
95100
</plugin>
101+
96102
<plugin>
97103
<groupId>org.apache.maven.plugins</groupId>
98104
<artifactId>maven-surefire-plugin</artifactId>
99105
</plugin>
106+
100107
<plugin>
101108
<groupId>org.jetbrains.dokka</groupId>
102109
<artifactId>dokka-maven-plugin</artifactId>
@@ -121,17 +128,20 @@
121128
</execution>
122129
</executions>
123130
</plugin>
131+
124132
<plugin>
125133
<groupId>io.qameta.allure</groupId>
126134
<artifactId>allure-maven</artifactId>
127135
<configuration>
128136
<resultsDirectory>${project.basedir}/allure-results/${scala.compat.version}</resultsDirectory>
129137
</configuration>
130138
</plugin>
139+
131140
<plugin>
132141
<groupId>org.jacoco</groupId>
133142
<artifactId>jacoco-maven-plugin</artifactId>
134143
</plugin>
144+
135145
</plugins>
136146
</build>
137147
</project>

0 commit comments

Comments
 (0)