Skip to content

Tuple first #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
aff3edc
added deprecation notice to arities
Jolanrensen Mar 23, 2022
c19d6d8
adding all tuple functions from scalaTuplesInKotlin, starting library…
Jolanrensen Mar 23, 2022
4b82192
adding all tuple functions from scalaTuplesInKotlin, starting library…
Jolanrensen Mar 23, 2022
558c3ab
adding updated tuple functions from scala tuples in kotlin library.
Jolanrensen Mar 24, 2022
a455710
now using X for tuples, updated code and examples/tests. Currently, w…
Jolanrensen Mar 25, 2022
9acd8b9
updating tuple docs
Jolanrensen Mar 28, 2022
d96dd55
updating docs and tests
Jolanrensen Mar 28, 2022
2367d87
calming qodana
Jolanrensen Mar 29, 2022
fb01c77
added tuple zip-functions
Jolanrensen Mar 29, 2022
b2350d2
updated readme for tuples
Jolanrensen Mar 29, 2022
2b5d283
added map and take(Last)n functions. Needs docs and more tests
Jolanrensen Mar 29, 2022
1287cab
attempt to exclude tuples from qodana
Jolanrensen Mar 30, 2022
a07673d
attempt to exclude tuples from qodana
Jolanrensen Mar 30, 2022
19be287
drop, splitAt and tests
Jolanrensen Mar 30, 2022
e0037a0
checking to see whether qodana.yaml is even working
Jolanrensen Mar 30, 2022
b13cbd9
checking to see whether qodana.yaml is even working
Jolanrensen Mar 30, 2022
a7484d2
updating docs and readme
Jolanrensen Mar 30, 2022
2002f95
adding error to qodana.yaml on purpose
Jolanrensen Mar 30, 2022
5683b09
adding qodana version?
Jolanrensen Mar 31, 2022
7862da2
excluding arities from qodana
Jolanrensen Mar 31, 2022
08f8259
updating examples and deprecations for qodana
Jolanrensen Mar 31, 2022
2de0732
pleasing qodana
Jolanrensen Mar 31, 2022
ec78459
moved tuples to separate module
Jolanrensen Apr 1, 2022
4a73aca
forgot qodana
Jolanrensen Apr 1, 2022
d79d61a
qodana works locally, come on github
Jolanrensen Apr 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
- [toList and toArray](#tolist-and-toarray-methods)
- [Column infix/operator functions](#column-infixoperator-functions)
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
- [Tuples](#tuples)
- [Examples](#examples)
- [Reporting issues/Support](#reporting-issuessupport)
- [Code of Conduct](#code-of-conduct)
Expand Down Expand Up @@ -204,6 +205,67 @@ We had to implement the functions `reduceGroups` and `reduce` for Kotlin separat

We have a special example of work with this function in the [Groups example](https://github.com/JetBrains/kotlin-spark-api/blob/main/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Group.kt).

### Tuples

Inspired by [ScalaTuplesInKotlin](https://github.com/Jolanrensen/ScalaTuplesInKotlin), the API introduces a lot of helper- extension functions
to make working with Scala Tuples a breeze in your Kotlin Spark projects. While working with data classes is encouraged,
for pair-like Datasets / RDDs / DStreams Scala Tuples are recommended, both for the useful helper functions, as well as Spark performance.
To enable these features
simply add
```kotlin
import org.jetbrains.kotlinx.spark.api.tuples.*
```
to the start of your file.

Tuple creation can be done in the following manners:
```kotlin
val a: Tuple2<Int, Long> = tupleOf(1, 2L)
val b: Tuple3<String, Double, Int> = t("test", 1.0, 2)
val c: Tuple3<Float, String, Int> = 5f X "aaa" X 1
```
Tuples can be expanded and merged like this:
```kotlin
// expand
tupleOf(1, 2).appendedBy(3) == tupleOf(1, 2, 3)
tupleOf(1, 2) + 3 == tupleOf(1, 2, 3)
tupleOf(2, 3).prependedBy(1) == tupleOf(1, 2, 3)
1 + tupleOf(2, 3) == tupleOf(1, 2, 3)

// merge
tupleOf(1, 2) concat tupleOf(3, 4) == tupleOf(1, 2, 3, 4)
tupleOf(1, 2) + tupleOf(3, 4) == tupleOf(1, 2, 3, 4)

// extend tuple instead of merging with it
tupleOf(1, 2).appendedBy(tupleOf(3, 4)) == tupleOf(1, 2, tupleOf(3, 4))
tupleOf(1, 2) + tupleOf(tupleOf(3, 4)) == tupleOf(1, 2, tupleOf(3, 4))
```

The concept of `EmptyTuple` from Scala 3 is also already present:
```kotlin
tupleOf(1).dropLast() == tupleOf() == emptyTuple()
```

Finally, all these tuple helper functions are also baked in:

- `componentX()` for destructuring: `val (a, b) = tuple`
- `dropLast() / dropFirst()`
- `contains(x)` for `if (x in tuple) { ... }`
- `iterator()` for `for (x in tuple) { ... }`
- `asIterable()`
- `size`
- `get(n) / get(i..j)` for `tuple[1] / tuple[i..j]`
- `getOrNull(n) / getOrNull(i..j)`
- `getAs<T>(n) / getAs<T>(i..j)`
- `getAsOrNull<T>(n) / getAsOrNull<T>(i..j)`
- `copy(_1 = ..., _5 = ...)`
- `first() / last()`
- `_1`, `_6` etc. (instead of `_1()`, `_6()`)
- `zip`
- `dropN() / dropLastN()`
- `takeN() / takeLastN()`
- `splitAtN()`
- `map`
- `cast`

## Examples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
package org.jetbrains.kotlinx.spark.examples

import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*

fun main() {
withSpark {
dsOf(1, 2, 3, 4, 5)
.map { it to (it + 2) }
.map { it X (it + 2) }
.withCached {
showDS()

filter { it.first % 2 == 0 }.showDS()
filter { it._1 % 2 == 0 }.showDS()
}
.map { c(it.first, it.second, (it.first + it.second) * 2) }
.map { it.appendedBy(it._1 + it._2 * 2) }
.show()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.examples

import org.apache.spark.sql.Row
import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*

fun main() {
withSpark {
Expand All @@ -39,7 +40,7 @@ fun main() {
}

dsOf(1, 2, 3)
.map { c(it, it + 1, it + 2) }
.map { t(it, it + 1, it + 2) }
.to<Row>()
.select("_1")
.collectAsList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
package org.jetbrains.kotlinx.spark.examples

import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*

fun main() {
withSpark {
dsOf(c(1, "a"), c(1, "b"), c(2, "c"))
dsOf(
1 X "a",
1 X "b",
2 X "c",
)
.groupByKey { it._1 }
.reduceGroupsK { a, b -> c(a._1 + b._1, a._2 + b._2) }
.reduceGroupsK { a, b ->
tupleOf(a._1 + b._1, a._2 + b._2)
}
.show()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.jetbrains.kotlinx.spark.examples

import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*


data class Left(val id: Int, val name: String)
Expand All @@ -32,10 +33,12 @@ fun main() {
val first = dsOf(Left(1, "a"), Left(2, "b"))
val second = dsOf(Right(1, 100), Right(3, 300))
first
.leftJoin(second, first.col("id").eq(second.col("id")))
.leftJoin(second, first.col("id") eq second.col("id"))
.debugCodegen()
.also { it.show() }
.map { c(it.first.id, it.first.name, it.second?.value) }
.map { (left, right) ->
left.id X left.name X right?.value
}
.show()

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,53 @@
package org.jetbrains.kotlinx.spark.examples

import org.apache.spark.api.java.function.ReduceFunction
import org.apache.spark.sql.Dataset
import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*
import scala.*

data class Q<T>(val id: Int, val text: T)
@Suppress("RedundantLambdaArrow", "UsePropertyAccessSyntax")
object Main {

@JvmStatic
fun main(args: Array<String>) {

val spark = SparkSession
.builder()
.master("local[2]")
.appName("Simple Application").orCreate

val triples = spark
.toDS(listOf(Q(1, 1 to null), Q(2, 2 to "22"), Q(3, 3 to "333")))
.map { (a, b) -> a + b.first to b.second?.length }
.map { it to 1 }
.map { (a, b) -> Triple(a.first, a.second, b) }
.appName("Simple Application")
.getOrCreate()

val triples: Dataset<Tuple3<Int, Int?, Int>> = spark
.toDS(
listOf(
Q(1, 1 X null),
Q(2, 2 X "22"),
Q(3, 3 X "333"),
)
)
.map { (a, b) -> t(a + b._1, b._2?.length) }
.map { it: Tuple2<Int, Int?> -> it + 1 } // add counter

val pairs = spark
.toDS(listOf(2 to "hell", 4 to "moon", 6 to "berry"))
.toDS(
listOf(
2 X "hell",
4 X "moon",
6 X "berry",
)
)

triples
.leftJoin(pairs, triples.col("first").multiply(2).eq(pairs.col("first")))
.leftJoin(
right = pairs,
col = triples("_1").multiply(2) eq pairs("_1"),
)
// .also { it.printSchema() }
.map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) }
.map { (triple, pair) -> Five(triple._1, triple._2, triple._3, pair?._1, pair?._2) }
.groupByKey { it.a }
.reduceGroupsK { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) }
.map { it.second }
.map { it._2 }
.repartition(1)
.withCached {
write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
package org.jetbrains.kotlinx.spark.examples

import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*

fun main() {
withSpark(props = mapOf("spark.sql.codegen.wholeStage" to true)) {
dsOf(
mapOf(1 to c(1, 2, 3), 2 to c(1, 2, 3)),
mapOf(3 to c(1, 2, 3), 4 to c(1, 2, 3)),
mapOf(1 to t(1, 2, 3), 2 to t(1, 2, 3)),
mapOf(3 to t(1, 2, 3), 4 to t(1, 2, 3)),
)
.flatMap { it.toList().map { p -> listOf(p.first, p.second._1, p.second._2, p.second._3) }.iterator() }
.flatMap {
it.toList()
.map { (first, tuple) -> (first + tuple).toList() }
.iterator()
}
.flatten()
.map { c(it) }
.map { tupleOf(it) }
.also { it.printSchema() }
.distinct()
.sort("_1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.examples

import org.apache.spark.sql.Dataset
import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*

const val MEANINGFUL_WORD_LENGTH = 4

Expand All @@ -33,15 +34,15 @@ fun main() {
.flatten()
.cleanup()
.groupByKey { it }
.mapGroups { k, iter -> k to iter.asSequence().count() }
.sort { arrayOf(it.col("second").desc()) }
.mapGroups { k, iter -> k X iter.asSequence().count() }
.sort { arrayOf(it(colName = "_2").desc()) }
.limit(20)
.map { it.second to it.first }
.map { it.swap() }
.show(false)
}
}

fun Dataset<String>.cleanup() =
fun Dataset<String>.cleanup(): Dataset<String> =
filter { it.isNotBlank() }
.map { it.trim(',', ' ', '\n', ':', '.', ';', '?', '!', '"', '\'', '\t', ' ') }
.filter { !it.endsWith("n’t") }
Expand Down
10 changes: 10 additions & 0 deletions kotlin-spark-api/3.2/pom_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>org.jetbrains.kotlinx.spark</groupId>
<artifactId>core-3.2_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx.spark</groupId>
<artifactId>scala-tuples-in-kotlin</artifactId>
</dependency>


<!-- Provided dependencies -->
Expand Down Expand Up @@ -75,6 +79,7 @@
<testSourceDirectory>src/test/kotlin</testSourceDirectory>
<directory>target/${scala.compat.version}</directory>
<plugins>

<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
Expand All @@ -93,10 +98,12 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.jetbrains.dokka</groupId>
<artifactId>dokka-maven-plugin</artifactId>
Expand All @@ -121,17 +128,20 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>io.qameta.allure</groupId>
<artifactId>allure-maven</artifactId>
<configuration>
<resultsDirectory>${project.basedir}/allure-results/${scala.compat.version}</resultsDirectory>
</configuration>
</plugin>

<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>

</plugins>
</build>
</project>
Loading