Skip to content

Add UDF support #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Sep 13, 2021
Merged

Add UDF support #106

merged 18 commits into from
Sep 13, 2021

Conversation

nonpool
Copy link
Contributor

@nonpool nonpool commented Aug 24, 2021

This pr is a complement to #67

solution

After many attempts, I gave up on hack RowEncoder, because it always has inexplicable bugs.
When register UDF, I directly convert DataTypeWithClass to spark DataType, and this solution seems to be working well.

problem

Please see UDFRegisterTest.kt line134, when use a complex type as the return value of UDF, we will got an exception(same as line134 link). This exception also exists in java(i test in my local on spark 3.1.1).So this bug we needs to wait for the official spark to fix or we try to hack related classes to fix it(maybe more effort)

Possible improvements

  • Add more unit test. Actually, I don’t know some common cases of UDF. you can give some help for this

@nonpool
Copy link
Contributor Author

nonpool commented Aug 24, 2021

@asm0dey @FelixEngl
After you review these commits, I will add 3.0 support and related documents.

@asm0dey
Copy link
Contributor

asm0dey commented Aug 24, 2021

Looks like returning complex classes works in Scala: https://stackoverflow.com/a/59293056

Are you sure it may not be supported in Kotlin?

*/
@OptIn(ExperimentalStdlibApi::class)
inline fun <reified R> UDFRegistration.register(name: String, noinline func: () -> R): UDFWrapper0 {
register(name, UDF0(func), schema(typeOf<R>()).unWrapper())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it makes sense to work not with our generated schema, but with schema from an encoder, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Actually, it is better than hack RowEncoder to adapter our generated schema when register a UDF. reason: 1. RowEncoder only need a correctly DataType, not our generated schema other info. 2. some spark action (eg: .as<Int>() action) to use UDF result is not though RowEncoder,if still use hack class way, we will hack many class and there is no guarantee that hacked classes will work well.

@nonpool
Copy link
Contributor Author

nonpool commented Aug 24, 2021

Why don't you extract the schema from the generated encoder?

I will remove this to other file

@nonpool
Copy link
Contributor Author

nonpool commented Aug 24, 2021

Looks like returning complex classes works in Scala: https://stackoverflow.com/a/59293056

Are you sure it may not be supported in Kotlin?

I think this is a big plan. If we don’t have the java implementation as a reference, this future should hack many class. i suggest separate this future to other PR

@asm0dey
Copy link
Contributor

asm0dey commented Aug 24, 2021

I will remove this to other file

no-no, I meant that we have encoder method, which allows you to use the schema, which was already processed by spark.

@asm0dey
Copy link
Contributor

asm0dey commented Aug 24, 2021

I think this is a big plan. If we don’t have the java implementation as a reference, this future should hack many class. i suggest separate this future to other PR

But if it works for the Scala API should we really provide something so limited? I mean people will just use Scala API because they will get weird exceptions in runtime which sounds as extremely bad UX. Like "we support UDFs, but only for couple of classes". And BTW do we throw exception if provided class is just regular data class or Java bean class? Or they will work?

@nonpool
Copy link
Contributor Author

nonpool commented Aug 24, 2021

I think this is a big plan. If we don’t have the java implementation as a reference, this future should hack many class. i suggest separate this future to other PR

But if it works for the Scala API should we really provide something so limited? I mean people will just use Scala API because they will get weird exceptions in runtime which sounds as extremely bad UX. Like "we support UDFs, but only for couple of classes". And BTW do we throw exception if provided class is just regular data class or Java bean class? Or they will work?

I created a simple example for this in my repo spark-udf-explore, and got some conclusion:

  • UDF return type is not support java bean in java spark or scala spark
  • UDF return type is not support normal scala class
  • UDF return type is only support case class in scala spark

Nevertheless, I still want to try my best to let users have a complete API. I will try to hack related classes to support return data class in kotlin API

@nonpool
Copy link
Contributor Author

nonpool commented Aug 24, 2021

I will remove this to other file

no-no, I meant that we have encoder method, which allows you to use the schema, which was already processed by spark.

But our encoder result is wrapper by KDataTypeWrapper KComplexTypeWrapper and so on, and these wrapper class is not support by RowEncoder or other spark native class which handles Datatype. If we use wrapper class register UDF we will hack more class and so on he same reason as I said above

@asm0dey
Copy link
Contributor

asm0dey commented Aug 24, 2021

And that's why we potentially should use not our schema, but schema, which can be obtained from encoder, I think

@nonpool
Copy link
Contributor Author

nonpool commented Aug 25, 2021

And that's why we potentially should use not our schema, but schema, which can be obtained from encoder, I think

What the schema can be obtain from encoder means? I think i have used our encoder to obtained schema, just unwrapper our schema when register UDF.
Could you provide some pseudo-code to show your intentions?

@asm0dey
Copy link
Contributor

asm0dey commented Aug 25, 2021

You're calling schema(typeOf<R>()), but you could call encoder<R>().schema()

@nonpool
Copy link
Contributor Author

nonpool commented Aug 25, 2021

You're calling schema(typeOf<R>()), but you could call encoder<R>().schema()

A..... Because these codes are copied, and I always thought that this is the encoder used here. ^-^

@asm0dey
Copy link
Contributor

asm0dey commented Aug 25, 2021

Copying code is usually just a bad idea I believe. It increases WTFps dramatically

@nonpool
Copy link
Contributor Author

nonpool commented Sep 1, 2021

After testing, Call encoder<R>().schema() to register UDF is not a good way. encoder<R>() method included two action:

  1. call schema(typeOf<R>()) to get schema(type is DataTypeWithClass)
  2. generate serializer and deserializer from schema and construct the ExpressionEncoder

so encoder<R>().schema() is call the schema() method of the ExpressionEncoder which is :

  // The schema after converting `T` to a Spark SQL row. This schema is dependent on the given
  // serializer.
  val schema: StructType = StructType(serializer.map { s =>
    StructField(s.name, s.dataType, s.nullable)
  })

it always wrapper a StructField in StructType. if call encoder<R>().schema() for String it return {"type":"struct","fields":[{"name":"value","type":"string","nullable":true,"metadata":{}}]}, but UDF register only need StringType. Not only string type and primitive type are like this, complex type also not work.

so, i think we should call schema(typeOf<R>()). Unless there are other changes in the complex type.

@asm0dey
Copy link
Contributor

asm0dey commented Sep 1, 2021

On the other hand, we could unwrap this struct, right? And this way we could avoid Spark dealing with our own DataTypeWithClass instances

@nonpool
Copy link
Contributor Author

nonpool commented Sep 1, 2021

On the other hand, we could unwrap this struct, right? And this way we could avoid Spark dealing with our own DataTypeWithClass instances

Spark does not directly use our DataTypeWithClass instances. Because i unwrap thought this:

fun DataType.unWrapper(): DataType {
    return when (this) {
        is DataTypeWithClass -> DataType.fromJson(dt().json())
        else -> this
    }
}

@asm0dey
Copy link
Contributor

asm0dey commented Sep 1, 2021

Should it work with deeply nested structures?

@nonpool
Copy link
Contributor Author

nonpool commented Sep 1, 2021

List<String> and List< primitive type> also work well.
The Complex type seems to be right when register UDF.

@asm0dey
Copy link
Contributor

asm0dey commented Sep 1, 2021

OK, I'm fine with design, let's port it to 3.0 now!

@asm0dey
Copy link
Contributor

asm0dey commented Sep 5, 2021

Wowzer, let's just suppress "unused" inspections

@asm0dey
Copy link
Contributor

asm0dey commented Sep 5, 2021

Or just ignore it and et me know when you're ready

@asm0dey asm0dey merged commit 05a3dca into Kotlin:main Sep 13, 2021
@Jolanrensen
Copy link
Collaborator

This was revamped in #152

See https://github.com/Kotlin/kotlin-spark-api/tree/main#user-defined-functions
and https://github.com/Kotlin/kotlin-spark-api/wiki/UDF.

The original notation was kept intact, just built around it :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants