Skip to content

Better UDF support #152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5c230ca
created udf delegates and tests
Jolanrensen May 6, 2022
685415b
created udf delegates and tests
Jolanrensen May 6, 2022
fca02f7
Merge branch 'spark-3.2' into udf+
Jolanrensen May 10, 2022
5ba1729
multiple registrations are possible for 1 function
Jolanrensen May 10, 2022
3aa69bd
added emptyDataset() to KSparkSession
Jolanrensen May 10, 2022
40cde30
WIP, col.typed<>(), refactoring register function
Jolanrensen May 30, 2022
e32e89f
wip new udf define methods etc. working on register()
Jolanrensen Jun 2, 2022
aef84d0
register now works with typedUDFs and directly in the call
Jolanrensen Jun 2, 2022
2f3be59
working on unnamed and normal udf version
Jolanrensen Jun 3, 2022
51b3260
wip
Jolanrensen Jun 3, 2022
3f0f8cd
Good luck Qodana.
Jolanrensen Jun 7, 2022
55c64b0
made tests compile. They still need to be adapted
Jolanrensen Jun 7, 2022
a00d230
added vararg udf version which can be created by making a single arra…
Jolanrensen Jun 8, 2022
2c8bd5f
updated to kotlin 1.7.0
Jolanrensen Jun 10, 2022
2b801e9
moved vararg unwrapper to scala since inheriting 20+ scala traits fro…
Jolanrensen Jun 10, 2022
cb68fc8
reworked column functions to prefer typed, so select() can be used in…
Jolanrensen Jun 13, 2022
2ab2436
singleCol()
Jolanrensen Jun 13, 2022
574018e
added deprecations for UDFRegister.kt
Jolanrensen Jun 14, 2022
7d13bc9
Merge branch 'spark-3.2' into udf+
Jolanrensen Jun 14, 2022
883c18d
small fixes and finished (?) udf tests
Jolanrensen Jun 15, 2022
1748916
small fixes and finished (?) udf tests
Jolanrensen Jun 15, 2022
0314280
working on examples
Jolanrensen Jun 15, 2022
9bdfe6e
typedLit and examples
Jolanrensen Jun 17, 2022
cc39f1d
Merge branch 'main' into udf+
Jolanrensen Jun 17, 2022
55d14ee
udaf examples
Jolanrensen Jun 17, 2022
81c692b
vararg examples
Jolanrensen Jun 17, 2022
aee1a79
readme and examples
Jolanrensen Jun 20, 2022
213f0e9
deleted file I forgot
Jolanrensen Jun 21, 2022
1bd2498
in preparation of scala 2.13, suggest Seq instead of WrappedArray, wh…
Jolanrensen Jun 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
- [Tuples](#tuples)
- [Streaming](#streaming)
- [User Defined Functions](#user-defined-functions)
- [Examples](#examples)
- [Reporting issues/Support](#reporting-issuessupport)
- [Code of Conduct](#code-of-conduct)
Expand Down Expand Up @@ -275,6 +276,48 @@ withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { //

For more information, check the [wiki](https://github.com/JetBrains/kotlin-spark-api/wiki/Streaming).

### User Defined Functions

Spark has a way to call functions from SQL using so-called [UDFs](https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html).
Using the Scala/Java API from Kotlin is not that obvious, so we decided to add special UDF support for Kotlin.
This support grew into a typesafe, name-safe, and feature-rich solution for which we will give an example:
```kotlin
// example of creation/naming, and registering of a simple UDF
val plusOne by udf { x: Int -> x + 1 }
plusOne.register()
spark.sql("SELECT plusOne(5)").show()
// +----------+
// |plusOne(5)|
// +----------+
// | 6|
// +----------+

// directly registering
udf.register("plusTwo") { x: Double -> x + 2.0 }
spark.sql("SELECT plusTwo(2.0d)").show()
// +------------+
// |plusTwo(2.0)|
// +------------+
// | 4.0|
// +------------+

// dataset select
val result: Dataset<Int> = myDs.select(
plusOne(col(MyType::age))
)
```

We support:
- a notation close to Spark's
- smart naming (with reflection)
- creation from function references
- typed column operations
- UDAF support and functional creation
- (Unique!) simple vararg UDF support

For more, check the [extensive examples](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/UDFs.kt).
Also, check out the [wiki](https://github.com/Kotlin/kotlin-spark-api/wiki/UDF).

## Examples

For more, check out [examples](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.jetbrains.kotlinx.spark.extensions

import org.apache.spark.sql.api.java.{UDF1, UDF2}

/**
* Allows any simple vararg function reference to be treated as 23 different Scala functions.
* Used to make vararg UDFs for `ScalaUDF`.
*
* @param varargFunc
* @param newArray
* @tparam T
* @tparam Array
* @tparam R
*/
class VarargUnwrapper[T, Array, R](
val varargFunc: UDF1[Array, R],
val newArray: UDF2[Integer, UDF1[Integer, T], Array],
) extends Serializable
with Function0[R]
with Function1[T, R]
with Function2[T, T, R]
with Function3[T, T, T, R]
with Function4[T, T, T, T, R]
with Function5[T, T, T, T, T, R]
with Function6[T, T, T, T, T, T, R]
with Function7[T, T, T, T, T, T, T, R]
with Function8[T, T, T, T, T, T, T, T, R]
with Function9[T, T, T, T, T, T, T, T, T, R]
with Function10[T, T, T, T, T, T, T, T, T, T, R]
with Function11[T, T, T, T, T, T, T, T, T, T, T, R]
with Function12[T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function13[T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function14[T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function15[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function16[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function17[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function18[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function19[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function20[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function21[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R]
with Function22[T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, T, R] {

private def vararg(t: T*): R = varargFunc.call(newArray.call(t.size, { t(_) }))

override def curried: Nothing = throw new UnsupportedOperationException()
override def tupled: Nothing = throw new UnsupportedOperationException()

override def apply(): R = vararg()

override def apply(v0: T): R = vararg(v0)

override def apply(v0: T, v1: T): R = vararg(v0, v1)

override def apply(v0: T, v1: T, v2: T): R = vararg(v0, v1, v2)

override def apply(v0: T, v1: T, v2: T, v3: T): R = vararg(v0, v1, v2, v3)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T): R = vararg(v0, v1, v2, v3, v4)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T): R = vararg(v0, v1, v2, v3, v4, v5)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T): R = vararg(v0, v1, v2, v3, v4, v5, v6)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T, v16: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T, v16: T, v17: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T, v16: T, v17: T, v18: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T, v16: T, v17: T, v18: T, v19: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T, v16: T, v17: T, v18: T, v19: T, v20: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20)

override def apply(v0: T, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, v7: T, v8: T, v9: T, v10: T, v11: T, v12: T, v13: T, v14: T, v15: T, v16: T, v17: T, v18: T, v19: T, v20: T, v21: T): R = vararg(v0, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21)
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object Main {
triples
.leftJoin(
right = pairs,
col = triples("_1").multiply(2) eq pairs("_1"),
col = triples.col("_1").multiply(2) eq pairs.col("_1"),
)
// .also { it.printSchema() }
.map { (triple, pair) -> Five(triple._1, triple._2, triple._3, pair?._1, pair?._2) }
Expand Down
Loading