From bfbc6ae4fab6b6252a00ec27ffbf810ea475173b Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 11:53:09 +0100 Subject: [PATCH 01/28] adding kdocs --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 32935f40..075d908c 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.api +import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.function.* @@ -192,15 +193,33 @@ private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder ) } +/** + * (Kotlin-specific) + * Returns a new Dataset that contains the result of applying [func] to each element. + */ inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset by first applying a function to all elements of this Dataset, + * and then flattening the results. + */ inline fun Dataset.flatMap(noinline func: (T) -> Iterator): Dataset = flatMap(func, encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset by flattening. This means that a Dataset of an iterable such as + * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6).` + */ inline fun > Dataset.flatten(): Dataset = flatMap(FlatMapFunction { it.iterator() }, encoder()) +/** + * (Kotlin-specific) + * Returns a [KeyValueGroupedDataset] where the data is grouped by the given key [func]. + */ inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset = groupByKey(MapFunction(func), encoder()) From ced2ef87e916dc33e6594a839cb22c64b34768d2 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 12:05:23 +0100 Subject: [PATCH 02/28] more docs --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 075d908c..134f17a5 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -211,7 +211,7 @@ inline fun Dataset.flatMap(noinline func: (T) -> Iterator): /** * (Kotlin-specific) * Returns a new Dataset by flattening. This means that a Dataset of an iterable such as - * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6).` + * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6)`. */ inline fun > Dataset.flatten(): Dataset = flatMap(FlatMapFunction { it.iterator() }, encoder()) @@ -223,18 +223,59 @@ inline fun > Dataset.flatten(): Dataset = inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset = groupByKey(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset that contains the result of applying [func] to each partition. + */ inline fun Dataset.mapPartitions(noinline func: (Iterator) -> Iterator): Dataset = mapPartitions(func, encoder()) +/** + * (Kotlin-specific) + * Filters rows to eliminate [null] values. + */ @Suppress("UNCHECKED_CAST") fun Dataset.filterNotNull(): Dataset = filter { it != null } as Dataset +/** + * Returns a new [KeyValueGroupedDataset] where the given function [func] has been applied + * to the data. The grouping key is unchanged by this. + * + * ```kotlin + * // Create values grouped by key from a Dataset> + * ds.groupByKey { it._1 }.mapValues { it._2 } + * ``` + */ inline fun KeyValueGroupedDataset.mapValues(noinline func: (VALUE) -> R): KeyValueGroupedDataset = mapValues(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Applies the given function to each group of data. For each unique group, the function will + * be passed the group key and an iterator that contains all the elements in the group. The + * function can return an element of arbitrary type which will be returned as a new [Dataset]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [Dataset]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [org.apache.spark.sql.expressions.Aggregator]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling [toList]) unless they are sure that this is possible given the memory + * constraints of their cluster. + */ inline fun KeyValueGroupedDataset.mapGroups(noinline func: (KEY, Iterator) -> R): Dataset = mapGroups(MapGroupsFunction(func), encoder()) +/** + * (Kotlin-specific) + * Reduces the elements of each group of data using the specified binary function. + * The given function must be commutative and associative or the result may be non-deterministic. + * + * Note that you need to use [reduceGroupsK] always instead of the Java- or Scala-specific + * [KeyValueGroupedDataset.reduceGroups] to make the compiler work. + */ inline fun KeyValueGroupedDataset.reduceGroupsK(noinline func: (VALUE, VALUE) -> VALUE): Dataset> = reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } From 9de5ae7a1452fb80f8ff1c09bfa6a640232a2d59 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 12:45:14 +0100 Subject: [PATCH 03/28] more docs --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 129 +++++++++++++++++- 1 file changed, 127 insertions(+), 2 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 134f17a5..59d12e6b 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -288,23 +288,63 @@ inline fun KeyValueGroupedDataset.reduc inline fun Dataset.reduceK(noinline func: (T, T) -> T): T = reduce(ReduceFunction(func)) +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Tuple2._1] values. + */ @JvmName("takeKeysTuple2") inline fun Dataset>.takeKeys(): Dataset = map { it._1() } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Pair.first] values. + */ inline fun Dataset>.takeKeys(): Dataset = map { it.first } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Arity2._1] values. + */ @JvmName("takeKeysArity2") inline fun Dataset>.takeKeys(): Dataset = map { it._1 } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Tuple2._2] values. + */ @JvmName("takeValuesTuple2") inline fun Dataset>.takeValues(): Dataset = map { it._2() } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Pair.second] values. + */ inline fun Dataset>.takeValues(): Dataset = map { it.second } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Arity2._2] values. + */ @JvmName("takeValuesArity2") inline fun Dataset>.takeValues(): Dataset = map { it._2 } - +/** + * (Kotlin-specific) + * Applies the given function to each group of data. For each unique group, the function will + * be passed the group key and an iterator that contains all the elements in the group. The + * function can return an iterator containing elements of an arbitrary type which will be returned + * as a new [Dataset]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [Dataset]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [org.apache.spark.sql.expressions.Aggregator]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling [toList]) unless they are sure that this is possible given the memory + * constraints of their cluster. + */ inline fun KeyValueGroupedDataset.flatMapGroups( noinline func: (key: K, values: Iterator) -> Iterator, ): Dataset = flatMapGroups( @@ -312,12 +352,57 @@ inline fun KeyValueGroupedDataset.flatMapGroups( encoder() ) +/** + * (Kotlin-specific) + * Returns the group state value if it exists, else [null]. + * This is comparable to [GroupState.getOption], but instead utilises Kotlin's nullability features + * to get the same result. + */ fun GroupState.getOrNull(): S? = if (exists()) get() else null +/** + * (Kotlin-specific) + * Allows the group state object to be used as a delegate. Will be [null] if it does not exist. + * + * For example: + * ```kotlin + * groupedDataset.mapGroupsWithState(GroupStateTimeout.NoTimeout()) { key, values, state: GroupState -> + * var s by state + * ... + * } + * ``` + */ operator fun GroupState.getValue(thisRef: Any?, property: KProperty<*>): S? = getOrNull() -operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value) +/** + * (Kotlin-specific) + * Allows the group state object to be used as a delegate. Will be [null] if it does not exist. + * + * For example: + * ```kotlin + * groupedDataset.mapGroupsWithState(GroupStateTimeout.NoTimeout()) { key, values, state: GroupState -> + * var s by state + * ... + * } + * ``` + */ +operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [org.apache.spark.sql.streaming.GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.mapGroupsWithState( noinline func: (key: K, values: Iterator, state: GroupState) -> U, ): Dataset = mapGroupsWithState( @@ -326,6 +411,22 @@ inline fun KeyValueGroupedDataset.mapGroupsWi encoder() ) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [org.apache.spark.sql.streaming.GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * @param timeoutConf Timeout configuration for groups that do not receive data for a while. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.mapGroupsWithState( timeoutConf: GroupStateTimeout, noinline func: (key: K, values: Iterator, state: GroupState) -> U, @@ -336,6 +437,23 @@ inline fun KeyValueGroupedDataset.mapGroupsWi timeoutConf ) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * @param outputMode The output mode of the function. + * @param timeoutConf Timeout configuration for groups that do not receive data for a while. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.flatMapGroupsWithState( outputMode: OutputMode, timeoutConf: GroupStateTimeout, @@ -348,6 +466,13 @@ inline fun KeyValueGroupedDataset.flatMapGrou timeoutConf ) +/** + * (Kotlin-specific) + * Applies the given function to each cogrouped data. For each unique group, the function will + * be passed the grouping key and 2 iterators containing all elements in the group from + * [Dataset] [this] and [other]. The function can return an iterator containing elements of an + * arbitrary type which will be returned as a new [Dataset]. + */ inline fun KeyValueGroupedDataset.cogroup( other: KeyValueGroupedDataset, noinline func: (key: K, left: Iterator, right: Iterator) -> Iterator, From 15d41b0f5db720b9d407335d5ea69e4f789a9e22 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 15:38:43 +0100 Subject: [PATCH 04/28] more docs and deprecated downcast() --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 59d12e6b..6c656046 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -482,13 +482,69 @@ inline fun KeyValueGroupedDataset.cogroup( encoder() ) +/** DEPRECATED: Use [as] or [to] for this. */ +@Deprecated( + message = "Deprecated, since we already have `as`() and to().", + replaceWith = ReplaceWith("this.to()"), + level = DeprecationLevel.ERROR, +) inline fun Dataset.downcast(): Dataset = `as`(encoder()) + +/** + * (Kotlin-specific) + * Returns a new Dataset where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of [R]: + * - When [R] is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by [spark.sql.caseSensitive]). + * - When [R] is a tuple, the columns will be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When [R] is a primitive type (i.e. [String], [Int], etc.), then the first column of the + * `DataFrame` will be used. + * + * If the schema of the Dataset does not match the desired [R] type, you can use [Dataset.select]/[selectTyped] + * along with [Dataset.alias] or [as]/[to] to rearrange or rename as required. + * + * Note that [as]/[to] only changes the view of the data that is passed into typed operations, + * such as [map], and does not eagerly project away any columns that are not present in + * the specified class. + * + * @see to as alias for [as] + */ inline fun Dataset<*>.`as`(): Dataset = `as`(encoder()) + +/** + * (Kotlin-specific) + * Returns a new Dataset where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of [R]: + * - When [R] is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by [spark.sql.caseSensitive]). + * - When [R] is a tuple, the columns will be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When [R] is a primitive type (i.e. [String], [Int], etc.), then the first column of the + * `DataFrame` will be used. + * + * If the schema of the Dataset does not match the desired [R] type, you can use [Dataset.select]/[selectTyped] + * along with [Dataset.alias] or [as]/[to] to rearrange or rename as required. + * + * Note that [as]/[to] only changes the view of the data that is passed into typed operations, + * such as [map], and does not eagerly project away any columns that are not present in + * the specified class. + * + * @see as as alias for [to] + */ inline fun Dataset<*>.to(): Dataset = `as`(encoder()) -inline fun Dataset.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func)) +/** + * (Kotlin-specific) + * Applies a function [func] to all rows. + */ +inline fun Dataset.forEach(noinline func: (T) -> Unit): Unit = foreach(ForeachFunction(func)) -inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit) = +/** + * (Kotlin-specific) + * Runs [func] on each partition of this Dataset. + */ +inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit): Unit = foreachPartition(ForeachPartitionFunction(func)) /** From 195da8daec512ca86710063573709525e940a51b Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 16:46:18 +0100 Subject: [PATCH 05/28] 2 todo's left, the rest of ApiV1.kt is now kdocced --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 69 ++++++++++++++----- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 6c656046..09c64213 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -37,6 +37,7 @@ import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions import scala.Product import scala.Tuple2 import scala.reflect.ClassTag +import scala.reflect.api.TypeTags.TypeTag import java.beans.PropertyDescriptor import java.math.BigDecimal import java.sql.Date @@ -83,7 +84,7 @@ import kotlin.reflect.full.primaryConstructor import kotlin.to @JvmField -val ENCODERS = mapOf, Encoder<*>>( +val ENCODERS: Map, Encoder<*>> = mapOf( Boolean::class to BOOLEAN(), Byte::class to BYTE(), Short::class to SHORT(), @@ -165,6 +166,9 @@ inline fun List.toDS(spark: SparkSession): Dataset = @OptIn(ExperimentalStdlibApi::class) inline fun encoder(): Encoder = generateEncoder(typeOf(), T::class) +/** + * @see encoder + */ fun generateEncoder(type: KType, cls: KClass<*>): Encoder { @Suppress("UNCHECKED_CAST") return when { @@ -173,7 +177,8 @@ fun generateEncoder(type: KType, cls: KClass<*>): Encoder { } as Encoder } -private fun isSupportedClass(cls: KClass<*>): Boolean = cls.isData +private fun isSupportedClass(cls: KClass<*>): Boolean = + cls.isData || cls.isSubclassOf(Map::class) || cls.isSubclassOf(Iterable::class) || cls.isSubclassOf(Product::class) @@ -550,18 +555,25 @@ inline fun Dataset.forEachPartition(noinline func: (Iterator) /** * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that */ -fun Dataset.debugCodegen() = also { KSparkExtensions.debugCodegen(it) } +fun Dataset.debugCodegen(): Dataset = also { KSparkExtensions.debugCodegen(it) } -val SparkSession.sparkContext +/** + * Returns the Spark context associated with this Spark session. + */ +val SparkSession.sparkContext: SparkContext get() = KSparkExtensions.sparkContext(this) /** * It's hard to call `Dataset.debug` from kotlin, so here is utility for that */ -fun Dataset.debug() = also { KSparkExtensions.debug(it) } +fun Dataset.debug(): Dataset = also { KSparkExtensions.debug(it) } @Suppress("FunctionName") -@Deprecated("Changed to \"`===`\" to better reflect Scala API.", ReplaceWith("this `===` c")) +@Deprecated( + message = "Changed to \"`===`\" to better reflect Scala API.", + replaceWith = ReplaceWith("this `===` c"), + level = DeprecationLevel.ERROR, +) infix fun Column.`==`(c: Column) = `$eq$eq$eq`(c) /** @@ -889,7 +901,10 @@ operator fun Column.rem(other: Any): Column = `$percent`(other) */ operator fun Column.get(key: Any): Column = getItem(key) -fun lit(a: Any) = functions.lit(a) +// TODO deprecate? +fun lit(a: Any): Column = functions.lit(a) + +fun typedLit(literal: Any): Column = functions.lit(literal) /** * Provides a type hint about the expected return value of this column. This information can @@ -996,8 +1011,13 @@ inline fun Dataset.withCached( return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) } } -inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) -inline fun Dataset<*>.toArray(): Array = to().collect() as Array +/** + * TODO + */ +inline fun Dataset<*>.toList(): List = to().collectAsList() as List +inline fun Dataset<*>.toArray(): Array = to().collect() as Array +//inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) +//inline fun Dataset<*>.toArray(): Array = to().collect() as Array /** * Selects column based on the column name and returns it as a [Column]. @@ -1014,7 +1034,6 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) * ``` * @see invoke */ - @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = col(column.name).`as`() as TypedColumn @@ -1129,6 +1148,14 @@ inline fun = mapOf()): DataType { val primitiveSchema = knownDataTypes[type.classifier] @@ -1228,15 +1255,24 @@ fun schema(type: KType, map: Map = mapOf()): DataType { } } +/** + * The entry point to programming Spark with the Dataset and DataFrame API. + * + * @see org.apache.spark.sql.SparkSession + */ typealias SparkSession = org.apache.spark.sql.SparkSession -fun SparkContext.setLogLevel(level: SparkLogLevel) = setLogLevel(level.name) +/** + * Control our logLevel. This overrides any user-defined log settings. + * @param level The desired log level as [SparkLogLevel]. + */ +fun SparkContext.setLogLevel(level: SparkLogLevel): Unit = setLogLevel(level.name) enum class SparkLogLevel { ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN } -private val knownDataTypes = mapOf( +private val knownDataTypes: Map, DataType> = mapOf( Byte::class to DataTypes.ByteType, Short::class to DataTypes.ShortType, Int::class to DataTypes.IntegerType, @@ -1248,7 +1284,7 @@ private val knownDataTypes = mapOf( LocalDate::class to `DateType$`.`MODULE$`, Date::class to `DateType$`.`MODULE$`, Timestamp::class to `TimestampType$`.`MODULE$`, - Instant::class to `TimestampType$`.`MODULE$` + Instant::class to `TimestampType$`.`MODULE$`, ) private fun transitiveMerge(a: Map, b: Map): Map { @@ -1258,11 +1294,12 @@ private fun transitiveMerge(a: Map, b: Map): Map(val f: (T) -> R) : (T) -> R { + private val values = ConcurrentHashMap() - override fun invoke(x: T) = - values.getOrPut(x, { f(x) }) + + override fun invoke(x: T): R = values.getOrPut(x) { f(x) } } private fun ((T) -> R).memoize(): (T) -> R = Memoize1(this) -private val memoizedSchema = { x: KType -> schema(x) }.memoize() +private val memoizedSchema: (KType) -> DataType = { x: KType -> schema(x) }.memoize() From 0b6c10c97f879175cc59fb0c4ba75fb8fa0a172b Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 20:11:58 +0100 Subject: [PATCH 06/28] 1 todo left, lit() --- .../kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 09c64213..85177d02 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -904,8 +904,6 @@ operator fun Column.get(key: Any): Column = getItem(key) // TODO deprecate? fun lit(a: Any): Column = functions.lit(a) -fun typedLit(literal: Any): Column = functions.lit(literal) - /** * Provides a type hint about the expected return value of this column. This information can * be used by operations such as `select` on a [Dataset] to automatically convert the @@ -1012,12 +1010,14 @@ inline fun Dataset.withCached( } /** - * TODO + * Collects the dataset as list where each item has been mapped to type [T]. */ inline fun Dataset<*>.toList(): List = to().collectAsList() as List + +/** + * Collects the dataset as Array where each item has been mapped to type [T]. + */ inline fun Dataset<*>.toArray(): Array = to().collect() as Array -//inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) -//inline fun Dataset<*>.toArray(): Array = to().collect() as Array /** * Selects column based on the column name and returns it as a [Column]. From a298239c0f9dd480a9f15d10524709af31712878 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Wed, 16 Feb 2022 14:45:23 +0100 Subject: [PATCH 07/28] last doc done --- .../kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 85177d02..e679f561 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -901,7 +901,16 @@ operator fun Column.rem(other: Any): Column = `$percent`(other) */ operator fun Column.get(key: Any): Column = getItem(key) -// TODO deprecate? +/** + * Creates a [Column] of literal value. + * + * The passed in object is returned directly if it is already a [Column]. + * If the object is a Scala Symbol, it is converted into a [Column] also. + * Otherwise, a new [Column] is created to represent the literal value. + * + * This is just a shortcut to the function from [org.apache.spark.sql.functions]. + * For all the functions, simply add `import org.apache.spark.sql.functions.*` to your file. + */ fun lit(a: Any): Column = functions.lit(a) /** From 9c1addc06816bbd56f3b513e1ded61ab7f05c258 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 21:31:28 +0100 Subject: [PATCH 08/28] updated badge to official as suggested in https://github.com/JetBrains/kotlin-spark-api/issues/110 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f090f0cf..79c669e4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.2) [![official JetBrains project](http://jb.gg/badges/incubator.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) +# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.2) [![official JetBrains project](http://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) Your next API to work with [Apache Spark](https://spark.apache.org/). From ca37bc518d6cbce706a3c153ec29c1fd4c31876b Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 11:53:09 +0100 Subject: [PATCH 09/28] adding kdocs --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 32935f40..075d908c 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -21,6 +21,7 @@ package org.jetbrains.kotlinx.spark.api +import org.apache.hadoop.shaded.org.apache.commons.math3.exception.util.ArgUtils import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.function.* @@ -192,15 +193,33 @@ private fun kotlinClassEncoder(schema: DataType, kClass: KClass<*>): Encoder ) } +/** + * (Kotlin-specific) + * Returns a new Dataset that contains the result of applying [func] to each element. + */ inline fun Dataset.map(noinline func: (T) -> R): Dataset = map(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset by first applying a function to all elements of this Dataset, + * and then flattening the results. + */ inline fun Dataset.flatMap(noinline func: (T) -> Iterator): Dataset = flatMap(func, encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset by flattening. This means that a Dataset of an iterable such as + * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6).` + */ inline fun > Dataset.flatten(): Dataset = flatMap(FlatMapFunction { it.iterator() }, encoder()) +/** + * (Kotlin-specific) + * Returns a [KeyValueGroupedDataset] where the data is grouped by the given key [func]. + */ inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset = groupByKey(MapFunction(func), encoder()) From 7e622ec18b38ca16deab32c2203611a35a76d93d Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 12:05:23 +0100 Subject: [PATCH 10/28] more docs --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 075d908c..134f17a5 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -211,7 +211,7 @@ inline fun Dataset.flatMap(noinline func: (T) -> Iterator): /** * (Kotlin-specific) * Returns a new Dataset by flattening. This means that a Dataset of an iterable such as - * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6).` + * `listOf(listOf(1, 2, 3), listOf(4, 5, 6))` will be flattened to a Dataset of `listOf(1, 2, 3, 4, 5, 6)`. */ inline fun > Dataset.flatten(): Dataset = flatMap(FlatMapFunction { it.iterator() }, encoder()) @@ -223,18 +223,59 @@ inline fun > Dataset.flatten(): Dataset = inline fun Dataset.groupByKey(noinline func: (T) -> R): KeyValueGroupedDataset = groupByKey(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Returns a new Dataset that contains the result of applying [func] to each partition. + */ inline fun Dataset.mapPartitions(noinline func: (Iterator) -> Iterator): Dataset = mapPartitions(func, encoder()) +/** + * (Kotlin-specific) + * Filters rows to eliminate [null] values. + */ @Suppress("UNCHECKED_CAST") fun Dataset.filterNotNull(): Dataset = filter { it != null } as Dataset +/** + * Returns a new [KeyValueGroupedDataset] where the given function [func] has been applied + * to the data. The grouping key is unchanged by this. + * + * ```kotlin + * // Create values grouped by key from a Dataset> + * ds.groupByKey { it._1 }.mapValues { it._2 } + * ``` + */ inline fun KeyValueGroupedDataset.mapValues(noinline func: (VALUE) -> R): KeyValueGroupedDataset = mapValues(MapFunction(func), encoder()) +/** + * (Kotlin-specific) + * Applies the given function to each group of data. For each unique group, the function will + * be passed the group key and an iterator that contains all the elements in the group. The + * function can return an element of arbitrary type which will be returned as a new [Dataset]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [Dataset]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [org.apache.spark.sql.expressions.Aggregator]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling [toList]) unless they are sure that this is possible given the memory + * constraints of their cluster. + */ inline fun KeyValueGroupedDataset.mapGroups(noinline func: (KEY, Iterator) -> R): Dataset = mapGroups(MapGroupsFunction(func), encoder()) +/** + * (Kotlin-specific) + * Reduces the elements of each group of data using the specified binary function. + * The given function must be commutative and associative or the result may be non-deterministic. + * + * Note that you need to use [reduceGroupsK] always instead of the Java- or Scala-specific + * [KeyValueGroupedDataset.reduceGroups] to make the compiler work. + */ inline fun KeyValueGroupedDataset.reduceGroupsK(noinline func: (VALUE, VALUE) -> VALUE): Dataset> = reduceGroups(ReduceFunction(func)) .map { t -> t._1 to t._2 } From d97c654913862ab9e239eca4f9e3eee7da2f77f4 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 12:45:14 +0100 Subject: [PATCH 11/28] more docs --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 129 +++++++++++++++++- 1 file changed, 127 insertions(+), 2 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 134f17a5..59d12e6b 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -288,23 +288,63 @@ inline fun KeyValueGroupedDataset.reduc inline fun Dataset.reduceK(noinline func: (T, T) -> T): T = reduce(ReduceFunction(func)) +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Tuple2._1] values. + */ @JvmName("takeKeysTuple2") inline fun Dataset>.takeKeys(): Dataset = map { it._1() } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Pair.first] values. + */ inline fun Dataset>.takeKeys(): Dataset = map { it.first } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "keys" or [Arity2._1] values. + */ @JvmName("takeKeysArity2") inline fun Dataset>.takeKeys(): Dataset = map { it._1 } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Tuple2._2] values. + */ @JvmName("takeValuesTuple2") inline fun Dataset>.takeValues(): Dataset = map { it._2() } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Pair.second] values. + */ inline fun Dataset>.takeValues(): Dataset = map { it.second } +/** + * (Kotlin-specific) + * Maps the Dataset to only retain the "values" or [Arity2._2] values. + */ @JvmName("takeValuesArity2") inline fun Dataset>.takeValues(): Dataset = map { it._2 } - +/** + * (Kotlin-specific) + * Applies the given function to each group of data. For each unique group, the function will + * be passed the group key and an iterator that contains all the elements in the group. The + * function can return an iterator containing elements of an arbitrary type which will be returned + * as a new [Dataset]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [Dataset]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [org.apache.spark.sql.expressions.Aggregator]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling [toList]) unless they are sure that this is possible given the memory + * constraints of their cluster. + */ inline fun KeyValueGroupedDataset.flatMapGroups( noinline func: (key: K, values: Iterator) -> Iterator, ): Dataset = flatMapGroups( @@ -312,12 +352,57 @@ inline fun KeyValueGroupedDataset.flatMapGroups( encoder() ) +/** + * (Kotlin-specific) + * Returns the group state value if it exists, else [null]. + * This is comparable to [GroupState.getOption], but instead utilises Kotlin's nullability features + * to get the same result. + */ fun GroupState.getOrNull(): S? = if (exists()) get() else null +/** + * (Kotlin-specific) + * Allows the group state object to be used as a delegate. Will be [null] if it does not exist. + * + * For example: + * ```kotlin + * groupedDataset.mapGroupsWithState(GroupStateTimeout.NoTimeout()) { key, values, state: GroupState -> + * var s by state + * ... + * } + * ``` + */ operator fun GroupState.getValue(thisRef: Any?, property: KProperty<*>): S? = getOrNull() -operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value) +/** + * (Kotlin-specific) + * Allows the group state object to be used as a delegate. Will be [null] if it does not exist. + * + * For example: + * ```kotlin + * groupedDataset.mapGroupsWithState(GroupStateTimeout.NoTimeout()) { key, values, state: GroupState -> + * var s by state + * ... + * } + * ``` + */ +operator fun GroupState.setValue(thisRef: Any?, property: KProperty<*>, value: S?): Unit = update(value) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [org.apache.spark.sql.streaming.GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.mapGroupsWithState( noinline func: (key: K, values: Iterator, state: GroupState) -> U, ): Dataset = mapGroupsWithState( @@ -326,6 +411,22 @@ inline fun KeyValueGroupedDataset.mapGroupsWi encoder() ) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [org.apache.spark.sql.streaming.GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * @param timeoutConf Timeout configuration for groups that do not receive data for a while. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.mapGroupsWithState( timeoutConf: GroupStateTimeout, noinline func: (key: K, values: Iterator, state: GroupState) -> U, @@ -336,6 +437,23 @@ inline fun KeyValueGroupedDataset.mapGroupsWi timeoutConf ) +/** + * (Kotlin-specific) + * Applies the given function to each group of data, while maintaining a user-defined per-group + * state. The result Dataset will represent the objects returned by the function. + * For a static batch Dataset, the function will be invoked once per group. For a streaming + * Dataset, the function will be invoked for each group repeatedly in every trigger, and + * updates to each group's state will be saved across invocations. + * See [GroupState] for more details. + * + * @param S The type of the user-defined state. Must be encodable to Spark SQL types. + * @param U The type of the output objects. Must be encodable to Spark SQL types. + * @param func Function to be called on every group. + * @param outputMode The output mode of the function. + * @param timeoutConf Timeout configuration for groups that do not receive data for a while. + * + * See [Encoder] for more details on what types are encodable to Spark SQL. + */ inline fun KeyValueGroupedDataset.flatMapGroupsWithState( outputMode: OutputMode, timeoutConf: GroupStateTimeout, @@ -348,6 +466,13 @@ inline fun KeyValueGroupedDataset.flatMapGrou timeoutConf ) +/** + * (Kotlin-specific) + * Applies the given function to each cogrouped data. For each unique group, the function will + * be passed the grouping key and 2 iterators containing all elements in the group from + * [Dataset] [this] and [other]. The function can return an iterator containing elements of an + * arbitrary type which will be returned as a new [Dataset]. + */ inline fun KeyValueGroupedDataset.cogroup( other: KeyValueGroupedDataset, noinline func: (key: K, left: Iterator, right: Iterator) -> Iterator, From 9b3239be2d7e2aeeaa77ac68991dfa0fc2d9e353 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 15:38:43 +0100 Subject: [PATCH 12/28] more docs and deprecated downcast() --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 59d12e6b..6c656046 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -482,13 +482,69 @@ inline fun KeyValueGroupedDataset.cogroup( encoder() ) +/** DEPRECATED: Use [as] or [to] for this. */ +@Deprecated( + message = "Deprecated, since we already have `as`() and to().", + replaceWith = ReplaceWith("this.to()"), + level = DeprecationLevel.ERROR, +) inline fun Dataset.downcast(): Dataset = `as`(encoder()) + +/** + * (Kotlin-specific) + * Returns a new Dataset where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of [R]: + * - When [R] is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by [spark.sql.caseSensitive]). + * - When [R] is a tuple, the columns will be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When [R] is a primitive type (i.e. [String], [Int], etc.), then the first column of the + * `DataFrame` will be used. + * + * If the schema of the Dataset does not match the desired [R] type, you can use [Dataset.select]/[selectTyped] + * along with [Dataset.alias] or [as]/[to] to rearrange or rename as required. + * + * Note that [as]/[to] only changes the view of the data that is passed into typed operations, + * such as [map], and does not eagerly project away any columns that are not present in + * the specified class. + * + * @see to as alias for [as] + */ inline fun Dataset<*>.`as`(): Dataset = `as`(encoder()) + +/** + * (Kotlin-specific) + * Returns a new Dataset where each record has been mapped on to the specified type. The + * method used to map columns depend on the type of [R]: + * - When [R] is a class, fields for the class will be mapped to columns of the same name + * (case sensitivity is determined by [spark.sql.caseSensitive]). + * - When [R] is a tuple, the columns will be mapped by ordinal (i.e. the first column will + * be assigned to `_1`). + * - When [R] is a primitive type (i.e. [String], [Int], etc.), then the first column of the + * `DataFrame` will be used. + * + * If the schema of the Dataset does not match the desired [R] type, you can use [Dataset.select]/[selectTyped] + * along with [Dataset.alias] or [as]/[to] to rearrange or rename as required. + * + * Note that [as]/[to] only changes the view of the data that is passed into typed operations, + * such as [map], and does not eagerly project away any columns that are not present in + * the specified class. + * + * @see as as alias for [to] + */ inline fun Dataset<*>.to(): Dataset = `as`(encoder()) -inline fun Dataset.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func)) +/** + * (Kotlin-specific) + * Applies a function [func] to all rows. + */ +inline fun Dataset.forEach(noinline func: (T) -> Unit): Unit = foreach(ForeachFunction(func)) -inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit) = +/** + * (Kotlin-specific) + * Runs [func] on each partition of this Dataset. + */ +inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit): Unit = foreachPartition(ForeachPartitionFunction(func)) /** From a81a75b6e495718054d22c66c923b87d0017d272 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 16:46:18 +0100 Subject: [PATCH 13/28] 2 todo's left, the rest of ApiV1.kt is now kdocced --- .../org/jetbrains/kotlinx/spark/api/ApiV1.kt | 69 ++++++++++++++----- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 6c656046..09c64213 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -37,6 +37,7 @@ import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions import scala.Product import scala.Tuple2 import scala.reflect.ClassTag +import scala.reflect.api.TypeTags.TypeTag import java.beans.PropertyDescriptor import java.math.BigDecimal import java.sql.Date @@ -83,7 +84,7 @@ import kotlin.reflect.full.primaryConstructor import kotlin.to @JvmField -val ENCODERS = mapOf, Encoder<*>>( +val ENCODERS: Map, Encoder<*>> = mapOf( Boolean::class to BOOLEAN(), Byte::class to BYTE(), Short::class to SHORT(), @@ -165,6 +166,9 @@ inline fun List.toDS(spark: SparkSession): Dataset = @OptIn(ExperimentalStdlibApi::class) inline fun encoder(): Encoder = generateEncoder(typeOf(), T::class) +/** + * @see encoder + */ fun generateEncoder(type: KType, cls: KClass<*>): Encoder { @Suppress("UNCHECKED_CAST") return when { @@ -173,7 +177,8 @@ fun generateEncoder(type: KType, cls: KClass<*>): Encoder { } as Encoder } -private fun isSupportedClass(cls: KClass<*>): Boolean = cls.isData +private fun isSupportedClass(cls: KClass<*>): Boolean = + cls.isData || cls.isSubclassOf(Map::class) || cls.isSubclassOf(Iterable::class) || cls.isSubclassOf(Product::class) @@ -550,18 +555,25 @@ inline fun Dataset.forEachPartition(noinline func: (Iterator) /** * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that */ -fun Dataset.debugCodegen() = also { KSparkExtensions.debugCodegen(it) } +fun Dataset.debugCodegen(): Dataset = also { KSparkExtensions.debugCodegen(it) } -val SparkSession.sparkContext +/** + * Returns the Spark context associated with this Spark session. + */ +val SparkSession.sparkContext: SparkContext get() = KSparkExtensions.sparkContext(this) /** * It's hard to call `Dataset.debug` from kotlin, so here is utility for that */ -fun Dataset.debug() = also { KSparkExtensions.debug(it) } +fun Dataset.debug(): Dataset = also { KSparkExtensions.debug(it) } @Suppress("FunctionName") -@Deprecated("Changed to \"`===`\" to better reflect Scala API.", ReplaceWith("this `===` c")) +@Deprecated( + message = "Changed to \"`===`\" to better reflect Scala API.", + replaceWith = ReplaceWith("this `===` c"), + level = DeprecationLevel.ERROR, +) infix fun Column.`==`(c: Column) = `$eq$eq$eq`(c) /** @@ -889,7 +901,10 @@ operator fun Column.rem(other: Any): Column = `$percent`(other) */ operator fun Column.get(key: Any): Column = getItem(key) -fun lit(a: Any) = functions.lit(a) +// TODO deprecate? +fun lit(a: Any): Column = functions.lit(a) + +fun typedLit(literal: Any): Column = functions.lit(literal) /** * Provides a type hint about the expected return value of this column. This information can @@ -996,8 +1011,13 @@ inline fun Dataset.withCached( return cached.executeOnCached().also { cached.unpersist(blockingUnpersist) } } -inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) -inline fun Dataset<*>.toArray(): Array = to().collect() as Array +/** + * TODO + */ +inline fun Dataset<*>.toList(): List = to().collectAsList() as List +inline fun Dataset<*>.toArray(): Array = to().collect() as Array +//inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) +//inline fun Dataset<*>.toArray(): Array = to().collect() as Array /** * Selects column based on the column name and returns it as a [Column]. @@ -1014,7 +1034,6 @@ operator fun Dataset.invoke(colName: String): Column = col(colName) * ``` * @see invoke */ - @Suppress("UNCHECKED_CAST") inline fun Dataset.col(column: KProperty1): TypedColumn = col(column.name).`as`() as TypedColumn @@ -1129,6 +1148,14 @@ inline fun = mapOf()): DataType { val primitiveSchema = knownDataTypes[type.classifier] @@ -1228,15 +1255,24 @@ fun schema(type: KType, map: Map = mapOf()): DataType { } } +/** + * The entry point to programming Spark with the Dataset and DataFrame API. + * + * @see org.apache.spark.sql.SparkSession + */ typealias SparkSession = org.apache.spark.sql.SparkSession -fun SparkContext.setLogLevel(level: SparkLogLevel) = setLogLevel(level.name) +/** + * Control our logLevel. This overrides any user-defined log settings. + * @param level The desired log level as [SparkLogLevel]. + */ +fun SparkContext.setLogLevel(level: SparkLogLevel): Unit = setLogLevel(level.name) enum class SparkLogLevel { ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN } -private val knownDataTypes = mapOf( +private val knownDataTypes: Map, DataType> = mapOf( Byte::class to DataTypes.ByteType, Short::class to DataTypes.ShortType, Int::class to DataTypes.IntegerType, @@ -1248,7 +1284,7 @@ private val knownDataTypes = mapOf( LocalDate::class to `DateType$`.`MODULE$`, Date::class to `DateType$`.`MODULE$`, Timestamp::class to `TimestampType$`.`MODULE$`, - Instant::class to `TimestampType$`.`MODULE$` + Instant::class to `TimestampType$`.`MODULE$`, ) private fun transitiveMerge(a: Map, b: Map): Map { @@ -1258,11 +1294,12 @@ private fun transitiveMerge(a: Map, b: Map): Map(val f: (T) -> R) : (T) -> R { + private val values = ConcurrentHashMap() - override fun invoke(x: T) = - values.getOrPut(x, { f(x) }) + + override fun invoke(x: T): R = values.getOrPut(x) { f(x) } } private fun ((T) -> R).memoize(): (T) -> R = Memoize1(this) -private val memoizedSchema = { x: KType -> schema(x) }.memoize() +private val memoizedSchema: (KType) -> DataType = { x: KType -> schema(x) }.memoize() From f813423d5c3f23fc41e372cdf858f9b7ade8e7f5 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Tue, 15 Feb 2022 20:11:58 +0100 Subject: [PATCH 14/28] 1 todo left, lit() --- .../kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 09c64213..85177d02 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -904,8 +904,6 @@ operator fun Column.get(key: Any): Column = getItem(key) // TODO deprecate? fun lit(a: Any): Column = functions.lit(a) -fun typedLit(literal: Any): Column = functions.lit(literal) - /** * Provides a type hint about the expected return value of this column. This information can * be used by operations such as `select` on a [Dataset] to automatically convert the @@ -1012,12 +1010,14 @@ inline fun Dataset.withCached( } /** - * TODO + * Collects the dataset as list where each item has been mapped to type [T]. */ inline fun Dataset<*>.toList(): List = to().collectAsList() as List + +/** + * Collects the dataset as Array where each item has been mapped to type [T]. + */ inline fun Dataset<*>.toArray(): Array = to().collect() as Array -//inline fun Dataset.toList() = KSparkExtensions.collectAsList(to()) -//inline fun Dataset<*>.toArray(): Array = to().collect() as Array /** * Selects column based on the column name and returns it as a [Column]. From 75db05ddf26f3143bd1c2481a1c74e270c229bb5 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Wed, 16 Feb 2022 14:45:23 +0100 Subject: [PATCH 15/28] last doc done --- .../kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt index 85177d02..e679f561 100644 --- a/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt +++ b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt @@ -901,7 +901,16 @@ operator fun Column.rem(other: Any): Column = `$percent`(other) */ operator fun Column.get(key: Any): Column = getItem(key) -// TODO deprecate? +/** + * Creates a [Column] of literal value. + * + * The passed in object is returned directly if it is already a [Column]. + * If the object is a Scala Symbol, it is converted into a [Column] also. + * Otherwise, a new [Column] is created to represent the literal value. + * + * This is just a shortcut to the function from [org.apache.spark.sql.functions]. + * For all the functions, simply add `import org.apache.spark.sql.functions.*` to your file. + */ fun lit(a: Any): Column = functions.lit(a) /** From 0cc61c02b941935606977bd80d54141598e9c84c Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 12:09:07 +0100 Subject: [PATCH 16/28] added docs for iterators --- .../main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt b/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt index b93ce377..9f7de351 100644 --- a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt +++ b/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt @@ -57,9 +57,13 @@ class FilteringIterator( done() } } + +/** Maps the values of the iterator lazily using [func]. */ fun Iterator.map(func: (T) -> R): Iterator = MappingIterator(this, func) +/** Filters the values of the iterator lazily using [func]. */ fun Iterator.filter(func: (T) -> Boolean): Iterator = FilteringIterator(this, func) +/** Partitions the values of the iterator lazily in groups of [size]. */ fun Iterator.partition(size: Int): Iterator> = PartitioningIterator(this, size) From a9db561eb2d06378137b7f38b2c641ade593e28c Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 12:47:38 +0100 Subject: [PATCH 17/28] attempting to create github action to generate dokka docs --- .github/workflows/generate_docs.yml | 25 +++++++++++++++++++++++++ pom.xml | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/generate_docs.yml diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml new file mode 100644 index 00000000..0017ad46 --- /dev/null +++ b/.github/workflows/generate_docs.yml @@ -0,0 +1,25 @@ +name: Generate and publish docs + +on: + push: + branches: + - "spark-3.2" + pull_request: + branches: + - "spark-3.2" + +jobs: + generate-and-publish-docs: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 11 + uses: actions/setup-java@v1 + with: + distributions: adopt + java-version: 11 + check-latest: true + - name: Generate docs + run: ./mvnw clean package site -Dmaven.test.skip=true + # TODO create branch and copy the docs over \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4f0974c5..64cb115d 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ 1.5.30 - 1.4.32 + 1.6.10 0.16.0 4.6.0 1.0.1 From 015411f3042324dbc50c718e0164d5cb4acb535b Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 13:32:11 +0100 Subject: [PATCH 18/28] merging common into the api so the docs are merged --- .github/workflows/generate_docs.yml | 2 +- kotlin-spark-api/3.2/pom_2.12.xml | 5 +- .../jetbrains/kotlinx/spark/api/Iterators.kt | 0 .../jetbrains/kotlinx/spark/api/VarArities.kt | 0 kotlin-spark-api/common/pom.xml | 56 ------------------- pom.xml | 6 -- 6 files changed, 2 insertions(+), 67 deletions(-) rename kotlin-spark-api/{common => 3.2}/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt (100%) rename kotlin-spark-api/{common => 3.2}/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt (100%) delete mode 100644 kotlin-spark-api/common/pom.xml diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 0017ad46..9ca5d938 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -22,4 +22,4 @@ jobs: check-latest: true - name: Generate docs run: ./mvnw clean package site -Dmaven.test.skip=true - # TODO create branch and copy the docs over \ No newline at end of file + # TODO create branch and copy the docs over from kotlin-spark-api/3.2/target/dokka \ No newline at end of file diff --git a/kotlin-spark-api/3.2/pom_2.12.xml b/kotlin-spark-api/3.2/pom_2.12.xml index 7195f912..756d9c2b 100644 --- a/kotlin-spark-api/3.2/pom_2.12.xml +++ b/kotlin-spark-api/3.2/pom_2.12.xml @@ -27,10 +27,7 @@ org.jetbrains.kotlinx.spark core-3.2_${scala.compat.version} - - org.jetbrains.kotlinx.spark - kotlin-spark-api-common - + diff --git a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt similarity index 100% rename from kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt rename to kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Iterators.kt diff --git a/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt b/kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt similarity index 100% rename from kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt rename to kotlin-spark-api/3.2/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt diff --git a/kotlin-spark-api/common/pom.xml b/kotlin-spark-api/common/pom.xml deleted file mode 100644 index 19959fdb..00000000 --- a/kotlin-spark-api/common/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - 4.0.0 - - Kotlin Spark API: Common - kotlin-spark-api-common - Kotlin API for Apache Spark: common parts - - org.jetbrains.kotlinx.spark - kotlin-spark-api-parent - 1.0.4-SNAPSHOT - ../.. - - - - - org.jetbrains.kotlin - kotlin-stdlib-jdk8 - - - - - src/main/kotlin - src/test/kotlin - - - org.jetbrains.kotlin - kotlin-maven-plugin - - - org.jetbrains.dokka - dokka-maven-plugin - ${dokka.version} - - 8 - - - - dokka - - dokka - - pre-site - - - javadocjar - - javadocJar - - pre-integration-test - - - - - - diff --git a/pom.xml b/pom.xml index 64cb115d..47043737 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,6 @@ - kotlin-spark-api/common dummy @@ -51,11 +50,6 @@ kotlin-reflect ${kotlin.version} - - org.jetbrains.kotlinx.spark - kotlin-spark-api-common - ${project.version} - From 301f46d36345af4a57c77c5fb82f62326cf482d3 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 13:40:28 +0100 Subject: [PATCH 19/28] added copy docs action --- .github/workflows/generate_docs.yml | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 9ca5d938..efa17b88 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -22,4 +22,17 @@ jobs: check-latest: true - name: Generate docs run: ./mvnw clean package site -Dmaven.test.skip=true - # TODO create branch and copy the docs over from kotlin-spark-api/3.2/target/dokka \ No newline at end of file + - name: Copy docs to "docs" branch + env: + SRC_FOLDER: "kotlin-spark-api/3.2/target/dokka" + TARGET_BRANCH: "docs" + run: | + files=$(find $SRC_FOLDER -type f) # get the file list + git config --global user.name 'GitHub Action' + git config --global user.email 'action@github.com' + git fetch # fetch branches + git checkout $TARGET_BRANCH # checkout to your branch + git checkout ${GITHUB_REF##*/} -- $files # copy files from the source branch + git add -A + git diff-index --quiet HEAD || git commit -am "updated docs" # commit to the repository (ignore if no modification) + git push origin $TARGET_BRANCH # push to remote branch From dca5df879e8b568536860a0c7cc852aa4b794a68 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 13:52:41 +0100 Subject: [PATCH 20/28] added copy docs action --- .github/workflows/generate_docs.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index efa17b88..4a42bc6a 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -3,10 +3,10 @@ name: Generate and publish docs on: push: branches: - - "spark-3.2" + - "origin/spark-3.2" pull_request: branches: - - "spark-3.2" + - "origin/spark-3.2" jobs: generate-and-publish-docs: @@ -25,7 +25,7 @@ jobs: - name: Copy docs to "docs" branch env: SRC_FOLDER: "kotlin-spark-api/3.2/target/dokka" - TARGET_BRANCH: "docs" + TARGET_BRANCH: "origin/docs" run: | files=$(find $SRC_FOLDER -type f) # get the file list git config --global user.name 'GitHub Action' From 905593225aaf91932e2601aae4f220617e01ebed Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 14:25:23 +0100 Subject: [PATCH 21/28] alright, let's first test it for this branch. If that works, we can merge it with spark-3.2 --- .github/workflows/generate_docs.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 4a42bc6a..0f221f64 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -3,10 +3,12 @@ name: Generate and publish docs on: push: branches: - - "origin/spark-3.2" +# - "origin/spark-3.2" + - "origin/more-documentation" pull_request: branches: - - "origin/spark-3.2" +# - "origin/spark-3.2" + - "origin/more-documentation" jobs: generate-and-publish-docs: From adae99f1af77519ce0eca7ef212908986c6170e5 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Thu, 17 Feb 2022 14:31:58 +0100 Subject: [PATCH 22/28] alright, let's first test it for this branch. If that works, we can merge it with spark-3.2 --- .github/workflows/generate_docs.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 0f221f64..79ce086e 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -3,12 +3,13 @@ name: Generate and publish docs on: push: branches: -# - "origin/spark-3.2" - - "origin/more-documentation" + - "more-documentation" +# - "origin/spark-3.2"- pull_request: branches: + - "more-documentation" # - "origin/spark-3.2" - - "origin/more-documentation" + jobs: generate-and-publish-docs: From d4b8b5cd468ef2edf96d259373edef02de21c993 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 12:21:33 +0100 Subject: [PATCH 23/28] alright, let's first test it for this branch. If that works, we can merge it with spark-3.2 --- .github/workflows/generate_docs.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 79ce086e..e5676b57 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -4,11 +4,11 @@ on: push: branches: - "more-documentation" -# - "origin/spark-3.2"- +# - "spark-3.2"- pull_request: branches: - "more-documentation" -# - "origin/spark-3.2" +# - "spark-3.2" jobs: @@ -31,6 +31,7 @@ jobs: TARGET_BRANCH: "origin/docs" run: | files=$(find $SRC_FOLDER -type f) # get the file list + git add ${files}/* git config --global user.name 'GitHub Action' git config --global user.email 'action@github.com' git fetch # fetch branches From 96b43da661b7ae28891885531d9c42a7c690fd37 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 12:32:04 +0100 Subject: [PATCH 24/28] alright, let's first test it for this branch. If that works, we can merge it with spark-3.2 --- .github/workflows/generate_docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index e5676b57..37e03885 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -31,7 +31,7 @@ jobs: TARGET_BRANCH: "origin/docs" run: | files=$(find $SRC_FOLDER -type f) # get the file list - git add ${files}/* + git add ${SRC_FOLDER}/* git config --global user.name 'GitHub Action' git config --global user.email 'action@github.com' git fetch # fetch branches From 883bdc5bbe309e3c33655c6436c31656847a6796 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 12:38:40 +0100 Subject: [PATCH 25/28] alright, let's first test it for this branch. If that works, we can merge it with spark-3.2 --- .github/workflows/generate_docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 37e03885..9d91fe8d 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -31,7 +31,7 @@ jobs: TARGET_BRANCH: "origin/docs" run: | files=$(find $SRC_FOLDER -type f) # get the file list - git add ${SRC_FOLDER}/* + git add -f ${SRC_FOLDER}/* git config --global user.name 'GitHub Action' git config --global user.email 'action@github.com' git fetch # fetch branches From 82d3940192a26dfbc1419111f00de8daf5534bef Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 12:50:22 +0100 Subject: [PATCH 26/28] maybe copycat works better --- .github/workflows/generate_docs.yml | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 9d91fe8d..9ba0c76c 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -3,11 +3,11 @@ name: Generate and publish docs on: push: branches: - - "more-documentation" + - "more-documentation" # TODO make "spark-3.2" # - "spark-3.2"- pull_request: branches: - - "more-documentation" + - "more-documentation" # TODO make "spark-3.2" # - "spark-3.2" @@ -26,17 +26,12 @@ jobs: - name: Generate docs run: ./mvnw clean package site -Dmaven.test.skip=true - name: Copy docs to "docs" branch - env: - SRC_FOLDER: "kotlin-spark-api/3.2/target/dokka" - TARGET_BRANCH: "origin/docs" - run: | - files=$(find $SRC_FOLDER -type f) # get the file list - git add -f ${SRC_FOLDER}/* - git config --global user.name 'GitHub Action' - git config --global user.email 'action@github.com' - git fetch # fetch branches - git checkout $TARGET_BRANCH # checkout to your branch - git checkout ${GITHUB_REF##*/} -- $files # copy files from the source branch - git add -A - git diff-index --quiet HEAD || git commit -am "updated docs" # commit to the repository (ignore if no modification) - git push origin $TARGET_BRANCH # push to remote branch + uses: andstor/copycat-action@v3 + with: + personal_token: ${{ secrets.PERSONAL_TOKEN }} + src_path: "kotlin-spark-api/3.2/target/dokka" + src_branch: "more-documentation" # TODO make "spark-3.2" + dst_owner: "JetBrains" + dst_repo_name: "kotlin-spark-api" + dst_branch: "docs" + From 671fc3b1efcf983b87c9660e9a6e592a87e5e9a5 Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 13:08:06 +0100 Subject: [PATCH 27/28] using actions-gh-pages? --- .github/workflows/generate_docs.yml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 9ba0c76c..07d8da73 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -26,12 +26,10 @@ jobs: - name: Generate docs run: ./mvnw clean package site -Dmaven.test.skip=true - name: Copy docs to "docs" branch - uses: andstor/copycat-action@v3 + uses: peaceiris/actions-gh-pages@v3 with: - personal_token: ${{ secrets.PERSONAL_TOKEN }} - src_path: "kotlin-spark-api/3.2/target/dokka" - src_branch: "more-documentation" # TODO make "spark-3.2" - dst_owner: "JetBrains" - dst_repo_name: "kotlin-spark-api" - dst_branch: "docs" + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_branch: docs + publish_dir: ./kotlin-spark-api/3.2/target/dokka + From b5ece11d0ea873aeeca37974f7d36b347fad8f8d Mon Sep 17 00:00:00 2001 From: Jolan Rensen Date: Fri, 18 Feb 2022 13:15:53 +0100 Subject: [PATCH 28/28] it's working now! --- .github/workflows/generate_docs.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/generate_docs.yml b/.github/workflows/generate_docs.yml index 07d8da73..8977f513 100644 --- a/.github/workflows/generate_docs.yml +++ b/.github/workflows/generate_docs.yml @@ -3,12 +3,10 @@ name: Generate and publish docs on: push: branches: - - "more-documentation" # TODO make "spark-3.2" -# - "spark-3.2"- + - "spark-3.2" pull_request: branches: - - "more-documentation" # TODO make "spark-3.2" -# - "spark-3.2" + - "spark-3.2" jobs: