From 0a44d21f96126551795e29e6c2c36c8ef0d1f3c2 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Wed, 18 May 2022 14:18:19 +0200 Subject: [PATCH 1/2] added truncate and limit variables for jupyter notebooks --- .../kotlinx/spark/api/jupyter/Integration.kt | 68 +++++++++++++++++-- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt index 24ae04ce..bd4f287a 100644 --- a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt +++ b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt @@ -22,10 +22,9 @@ package org.jetbrains.kotlinx.spark.api.jupyter import org.apache.spark.api.java.JavaRDDLike import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset -import org.jetbrains.kotlinx.jupyter.api.FieldValue -import org.jetbrains.kotlinx.jupyter.api.HTML -import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost +import org.jetbrains.kotlinx.jupyter.api.* import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration +import kotlin.reflect.typeOf abstract class Integration : JupyterIntegration() { @@ -34,6 +33,11 @@ abstract class Integration : JupyterIntegration() { private val scalaVersion = "2.12.15" private val spark3Version = "3.2.1" + private val displayLimit = "DISPLAY_LIMIT" + private val displayLimitDefault = 20 + private val displayTruncate = "DISPLAY_TRUNCATE" + private val displayTruncateDefault = 30 + /** * Will be run after importing all dependencies */ @@ -84,6 +88,21 @@ abstract class Integration : JupyterIntegration() { import(*imports) onLoaded { + declare( + VariableDeclaration( + name = displayLimit, + value = displayLimitDefault, + type = typeOf(), + isMutable = true, + ), + VariableDeclaration( + name = displayTruncate, + value = displayTruncateDefault, + type = typeOf(), + isMutable = true, + ), + ) + onLoaded() } @@ -101,15 +120,50 @@ abstract class Integration : JupyterIntegration() { // Render Dataset render> { - HTML(it.toHtml()) + val limit = notebook + .variablesState[displayLimit] + ?.value + ?.getOrNull() as? Int + ?: displayLimitDefault + + val truncate = notebook + .variablesState[displayTruncate] + ?.value + ?.getOrNull() as? Int + ?: displayTruncateDefault + + HTML(it.toHtml(limit = limit, truncate = truncate)) } render> { - HTML(it.toJavaRDD().toHtml()) + val limit = notebook + .variablesState[displayLimit] + ?.value + ?.getOrNull() as? Int + ?: displayLimitDefault + + val truncate = notebook + .variablesState[displayTruncate] + ?.value + ?.getOrNull() as? Int + ?: displayTruncateDefault + + HTML(it.toJavaRDD().toHtml(limit = limit, truncate = truncate)) } - render> { - HTML(it.toHtml()) + val limit = notebook + .variablesState[displayLimit] + ?.value + ?.getOrNull() as? Int + ?: displayLimitDefault + + val truncate = notebook + .variablesState[displayTruncate] + ?.value + ?.getOrNull() as? Int + ?: displayTruncateDefault + + HTML(it.toHtml(limit = limit, truncate = truncate)) } } } From 25c52f5078ceadcd62064525c64169fedae3a559 Mon Sep 17 00:00:00 2001 From: Jolanrensen Date: Fri, 20 May 2022 16:35:32 +0200 Subject: [PATCH 2/2] added support for onInterrupt in spark streaming --- .../kotlinx/spark/api/jupyter/Integration.kt | 61 ++++++------ .../spark/api/jupyter/SparkIntegration.kt | 2 - .../api/jupyter/SparkStreamingIntegration.kt | 99 ++++++++++++++++++- .../kotlinx/spark/api/jupyter/JupyterTests.kt | 9 ++ pom.xml | 2 +- 5 files changed, 135 insertions(+), 38 deletions(-) diff --git a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt index bd4f287a..19ddda50 100644 --- a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt +++ b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/Integration.kt @@ -41,11 +41,17 @@ abstract class Integration : JupyterIntegration() { /** * Will be run after importing all dependencies */ - abstract fun KotlinKernelHost.onLoaded() + open fun KotlinKernelHost.onLoaded() = Unit - abstract fun KotlinKernelHost.onShutdown() + open fun KotlinKernelHost.onShutdown() = Unit - abstract fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) + open fun KotlinKernelHost.onInterrupt() = Unit + + open fun KotlinKernelHost.beforeCellExecution() = Unit + + open fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit + + open fun Builder.onLoadedAlsoDo() = Unit open val dependencies: Array = arrayOf( "org.apache.spark:spark-repl_$scalaCompatVersion:$spark3Version", @@ -108,62 +114,55 @@ abstract class Integration : JupyterIntegration() { beforeCellExecution { execute("""scala.Console.setOut(System.out)""") + + beforeCellExecution() } afterCellExecution { snippetInstance, result -> afterCellExecution(snippetInstance, result) } + onInterrupt { + onInterrupt() + } + onShutdown { onShutdown() } - // Render Dataset - render> { - val limit = notebook + fun getLimitAndTruncate() = Pair( + notebook .variablesState[displayLimit] ?.value ?.getOrNull() as? Int - ?: displayLimitDefault - - val truncate = notebook + ?: displayLimitDefault, + notebook .variablesState[displayTruncate] ?.value ?.getOrNull() as? Int ?: displayTruncateDefault + ) + + + // Render Dataset + render> { + val (limit, truncate) = getLimitAndTruncate() HTML(it.toHtml(limit = limit, truncate = truncate)) } render> { - val limit = notebook - .variablesState[displayLimit] - ?.value - ?.getOrNull() as? Int - ?: displayLimitDefault - - val truncate = notebook - .variablesState[displayTruncate] - ?.value - ?.getOrNull() as? Int - ?: displayTruncateDefault + val (limit, truncate) = getLimitAndTruncate() HTML(it.toJavaRDD().toHtml(limit = limit, truncate = truncate)) } - render> { - val limit = notebook - .variablesState[displayLimit] - ?.value - ?.getOrNull() as? Int - ?: displayLimitDefault - val truncate = notebook - .variablesState[displayTruncate] - ?.value - ?.getOrNull() as? Int - ?: displayTruncateDefault + render> { + val (limit, truncate) = getLimitAndTruncate() HTML(it.toHtml(limit = limit, truncate = truncate)) } + + onLoadedAlsoDo() } } diff --git a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt index a3ec6dc5..e817b282 100644 --- a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt +++ b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkIntegration.kt @@ -73,6 +73,4 @@ internal class SparkIntegration : Integration() { override fun KotlinKernelHost.onShutdown() { execute("""spark.stop()""") } - - override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit } diff --git a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.kt b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.kt index 4982830c..441672d3 100644 --- a/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.kt +++ b/jupyter/src/main/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/SparkStreamingIntegration.kt @@ -20,15 +20,18 @@ package org.jetbrains.kotlinx.spark.api.jupyter +import org.apache.spark.streaming.StreamingContextState +import org.apache.spark.streaming.api.java.JavaStreamingContext import org.intellij.lang.annotations.Language -import org.jetbrains.kotlinx.jupyter.api.FieldValue import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost +import org.jetbrains.kotlinx.jupyter.api.VariableDeclaration +import org.jetbrains.kotlinx.jupyter.api.declare +import kotlin.reflect.typeOf /** * %use spark-streaming */ @Suppress("UNUSED_VARIABLE", "LocalVariableName") -@OptIn(ExperimentalStdlibApi::class) internal class SparkStreamingIntegration : Integration() { override val imports: Array = super.imports + arrayOf( @@ -36,17 +39,105 @@ internal class SparkStreamingIntegration : Integration() { "org.apache.hadoop.conf.Configuration", ) + private val sscCollection = mutableSetOf() + override fun KotlinKernelHost.onLoaded() { + + declare( + VariableDeclaration( + name = ::sscCollection.name, + value = sscCollection, + isMutable = false, + type = typeOf>(), + ) + ) + val _0 = execute("""%dumpClassesForSpark""") @Language("kts") val _1 = listOf( + """ + @JvmOverloads + fun withSparkStreaming( + batchDuration: Duration = Durations.seconds(1L), + checkpointPath: String? = null, + hadoopConf: Configuration = SparkHadoopUtil.get().conf(), + createOnError: Boolean = false, + props: Map = emptyMap(), + master: String = SparkConf().get("spark.master", "local[*]"), + appName: String = "Kotlin Spark Sample", + timeout: Long = -1L, + startStreamingContext: Boolean = true, + func: KSparkStreamingSession.() -> Unit, + ) { + + // will only be set when a new context is created + var kSparkStreamingSession: KSparkStreamingSession? = null + + val creatingFunc = { + val sc = SparkConf() + .setAppName(appName) + .setMaster(master) + .setAll( + props + .map { (key, value) -> key X value.toString() } + .asScalaIterable() + ) + + val ssc = JavaStreamingContext(sc, batchDuration) + ssc.checkpoint(checkpointPath) + + kSparkStreamingSession = KSparkStreamingSession(ssc) + func(kSparkStreamingSession!!) + + ssc + } + + val ssc = when { + checkpointPath != null -> + JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) + + else -> creatingFunc() + } + sscCollection += ssc + + if (startStreamingContext) { + ssc.start() + kSparkStreamingSession?.invokeRunAfterStart() + } + ssc.awaitTerminationOrTimeout(timeout) + ssc.stop() + } + """.trimIndent(), """ println("To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.")""".trimIndent(), ).map(::execute) } - override fun KotlinKernelHost.onShutdown() = Unit + private fun cleanUp(e: Throwable): String { + while (sscCollection.isNotEmpty()) + sscCollection.first().let { + while (it.state != StreamingContextState.STOPPED) { + try { + it.stop(true, true) + } catch (_: Exception) { + } + } + sscCollection.remove(it) + } + + return "Spark streams cleaned up. Cause: $e" + } + + override fun Builder.onLoadedAlsoDo() { + renderThrowable { + cleanUp(it) + } + } - override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit + override fun KotlinKernelHost.onInterrupt() { + println( + cleanUp(InterruptedException("Kernel was interrupted.")) + ) + } } diff --git a/jupyter/src/test/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/JupyterTests.kt b/jupyter/src/test/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/JupyterTests.kt index 96d5d1fa..b4b750ee 100644 --- a/jupyter/src/test/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/JupyterTests.kt +++ b/jupyter/src/test/kotlin/org/jetbrains/kotlinx/spark/api/jupyter/JupyterTests.kt @@ -28,6 +28,7 @@ import io.kotest.matchers.string.shouldContain import io.kotest.matchers.types.shouldBeInstanceOf import jupyter.kotlin.DependsOn import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.streaming.api.java.JavaStreamingContext import org.intellij.lang.annotations.Language import org.jetbrains.kotlinx.jupyter.EvalRequestData import org.jetbrains.kotlinx.jupyter.ReplForJupyter @@ -263,6 +264,14 @@ class JupyterStreamingTests : ShouldSpec({ context("Jupyter") { withRepl { + // For when onInterrupt is implemented in the Jupyter kernel + should("Have sscCollection instance") { + + @Language("kts") + val sscCollection = exec("""sscCollection""") + sscCollection as? MutableSet shouldNotBe null + } + should("Not have spark instance") { shouldThrowAny { @Language("kts") diff --git a/pom.xml b/pom.xml index c102d72c..f4a488db 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1.1.0 1.3.1 5.2.3 - 0.11.0-83 + 0.11.0-95 1.6.21 0.7.5 3.2.1