Skip to content

Conversation

@FelixEngl
Copy link
Contributor

Changes to old pull request

Correct branching.

Short description

Add a kotlin-styled way to create and call UDFs in a more secure manner.

Possible improvements

  • Add more Unit-Tests?
  • Publish generator code for the functions (but i don't know where i should put it)

Future plans

  • Add advanced type-checking before calling a function by comparing the schematas of the function parameters with the column schema?

/**
* A shortcut for [KSparkSession.spark].udf()
*/
inline fun KSparkSession.udf(): UDFRegistration = spark.udf()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely don't need inline.
And why won't we put it inside KSparkSession?

@@ -0,0 +1,803 @@
@file:Suppress("NOTHING_TO_INLINE", "DuplicatedCode", "MemberVisibilityCanBePrivate")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this file is generated (I really hope it is) please provide us with generating code in comments. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not answering for a long time, was busy with another project. It's very good PR, but needs several iprovements.

No problem, and i thought so (especially with the improvements). This was only some kind of strange brainwave of mine and i hacked something together.

I will provide the generator-code after some clean-up, i only hacked it down for my own project and thought you might like/need it. :D
If you have further suggestions please let me know, i'll try to add them to the implementation.

(sorry that i can't do it today, but it's 00:30 in germany and i have a meeting in the morning.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no hurry at all, thank you for your effort.
How do you think if there is any way to support other classes natively supported by Spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my fork is a branch for calling the scala-java-conversion-wrappers (i didnt call for a merge request, because i'm not really satisfied with the tests for it. i have problems testing the mutable scala types for changes, if you could help me with that [or give me some tipps how to write them] i would be thankfull).

I think we can achieve some kind of "auto-conversion" by using reified and calling these wrapper-functions. (at least for the function-wrappers)

I wrote this text on my smartphone, please excuse the typing error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you would file a PR we at least could think on how to test something :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take care for the code around christmas. Right now i don't have enough time to work on my PRs besides my current tasks.
I also commited the PR for the Wrappers (see #72)

Thank you for your feedback in advance! 😄

@asm0dey
Copy link
Contributor

asm0dey commented Nov 29, 2020

Sorry for not answering for a long time, was busy with another project. It's very good PR, but needs several iprovements.

@asm0dey
Copy link
Contributor

asm0dey commented Nov 30, 2020

Please, forward-port this to version 3.0

@asm0dey
Copy link
Contributor

asm0dey commented Nov 30, 2020

I've following trouble:

I've tried to write more complex test:

                should("also work with datasets") {
                    listOf("a" to 1, "b" to 2).toDS().toDF().createOrReplaceTempView("test1")
                    udf.register<String, Int, Int>("stringIntDiff") { a, b ->
                        a[0].toInt() - b
                    }
                    spark.sql("select stringIntDiff(first, second) from test1").show()

                }

and it fails with

IntegerType (of class org.apache.spark.sql.KSimpleTypeWrapper)
scala.MatchError: IntegerType (of class org.apache.spark.sql.KSimpleTypeWrapper)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:225)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:222)
	at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1728)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:185)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:181)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:61)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
	at org.jetbrains.kotlinx.spark.api.UDFRegisterTest$1$1$3$1$2.invokeSuspend(UDFRegisterTest.kt:104)
……

Looks like encoders won't work for our primitive type wrappers.

asm0dey and others added 5 commits December 1, 2020 01:36
# Conflicts:
#	kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UDFRegister.kt
#	kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/UDFRegisterTest.kt
@asm0dey asm0dey force-pushed the master branch 2 times, most recently from db91ee0 to aa11744 Compare May 7, 2021 16:01
@asm0dey asm0dey force-pushed the main branch 4 times, most recently from a9da30c to 8e7523a Compare July 15, 2021 20:53
nonpool added a commit to nonpool/kotlin-spark-api that referenced this pull request Aug 23, 2021
@nonpool nonpool mentioned this pull request Aug 24, 2021
asm0dey added a commit that referenced this pull request Sep 13, 2021
* copy code from #67

* remove hacked RowEncoder.scala

* replace all schema(typeOf<R>()) to DataType.fromJson((schema(typeOf<R>()) as DataTypeWithClass).dt().json())

* add return udf data class test

* change test

* add in dataset test for calling the UDF-Wrapper

* add the same exception link

* refactor unWrapper

* add test for udf return a List

* make the test simpler

* add License

* add UDFRegister for 3.0

* remove useless import

* resolved deprecated method

* [experimental] add CatalystTypeConverters.scala for hacked it to implement UDF return data class

* [experimental] implement UDF return data class

* fix code inspection issue

* Adds suppre unused

Co-authored-by: can wang <[email protected]>
Co-authored-by: Pasha Finkelshteyn <[email protected]>
@Jolanrensen
Copy link
Collaborator

Closing this. We don't support spark 2 anymore and based on the UDFs for spark 3 we implemented the registration: #152 (released at v1.2.0)
Thanks for the help and inspiration @FelixEngl !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants