Skip to content

Truncate and limit variables and onInterrupt stream cleanup for jupyter notebooks (1.1.0) #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ package org.jetbrains.kotlinx.spark.api.jupyter
import org.apache.spark.api.java.JavaRDDLike
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.jetbrains.kotlinx.jupyter.api.FieldValue
import org.jetbrains.kotlinx.jupyter.api.HTML
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
import org.jetbrains.kotlinx.jupyter.api.*
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration
import kotlin.reflect.typeOf

abstract class Integration : JupyterIntegration() {

Expand All @@ -34,14 +33,25 @@ abstract class Integration : JupyterIntegration() {
private val scalaVersion = "2.12.15"
private val spark3Version = "3.2.1"

private val displayLimit = "DISPLAY_LIMIT"
private val displayLimitDefault = 20
private val displayTruncate = "DISPLAY_TRUNCATE"
private val displayTruncateDefault = 30

/**
* Will be run after importing all dependencies
*/
abstract fun KotlinKernelHost.onLoaded()
open fun KotlinKernelHost.onLoaded() = Unit

open fun KotlinKernelHost.onShutdown() = Unit

open fun KotlinKernelHost.onInterrupt() = Unit

open fun KotlinKernelHost.beforeCellExecution() = Unit

abstract fun KotlinKernelHost.onShutdown()
open fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit

abstract fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue)
open fun Builder.onLoadedAlsoDo() = Unit

open val dependencies: Array<String> = arrayOf(
"org.apache.spark:spark-repl_$scalaCompatVersion:$spark3Version",
Expand Down Expand Up @@ -84,32 +94,75 @@ abstract class Integration : JupyterIntegration() {
import(*imports)

onLoaded {
declare(
VariableDeclaration(
name = displayLimit,
value = displayLimitDefault,
type = typeOf<Int>(),
isMutable = true,
),
VariableDeclaration(
name = displayTruncate,
value = displayTruncateDefault,
type = typeOf<Int>(),
isMutable = true,
),
)

onLoaded()
}

beforeCellExecution {
execute("""scala.Console.setOut(System.out)""")

beforeCellExecution()
}

afterCellExecution { snippetInstance, result ->
afterCellExecution(snippetInstance, result)
}

onInterrupt {
onInterrupt()
}

onShutdown {
onShutdown()
}

fun getLimitAndTruncate() = Pair(
notebook
.variablesState[displayLimit]
?.value
?.getOrNull() as? Int
?: displayLimitDefault,
notebook
.variablesState[displayTruncate]
?.value
?.getOrNull() as? Int
?: displayTruncateDefault
)


// Render Dataset
render<Dataset<*>> {
HTML(it.toHtml())
val (limit, truncate) = getLimitAndTruncate()

HTML(it.toHtml(limit = limit, truncate = truncate))
}

render<RDD<*>> {
HTML(it.toJavaRDD().toHtml())
val (limit, truncate) = getLimitAndTruncate()

HTML(it.toJavaRDD().toHtml(limit = limit, truncate = truncate))
}

render<JavaRDDLike<*, *>> {
HTML(it.toHtml())
val (limit, truncate) = getLimitAndTruncate()

HTML(it.toHtml(limit = limit, truncate = truncate))
}

onLoadedAlsoDo()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,4 @@ internal class SparkIntegration : Integration() {
override fun KotlinKernelHost.onShutdown() {
execute("""spark.stop()""")
}

override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,124 @@
package org.jetbrains.kotlinx.spark.api.jupyter


import org.apache.spark.streaming.StreamingContextState
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.intellij.lang.annotations.Language
import org.jetbrains.kotlinx.jupyter.api.FieldValue
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
import org.jetbrains.kotlinx.jupyter.api.VariableDeclaration
import org.jetbrains.kotlinx.jupyter.api.declare
import kotlin.reflect.typeOf

/**
* %use spark-streaming
*/
@Suppress("UNUSED_VARIABLE", "LocalVariableName")
@OptIn(ExperimentalStdlibApi::class)
internal class SparkStreamingIntegration : Integration() {

override val imports: Array<String> = super.imports + arrayOf(
"org.apache.spark.deploy.SparkHadoopUtil",
"org.apache.hadoop.conf.Configuration",
)

private val sscCollection = mutableSetOf<JavaStreamingContext>()

override fun KotlinKernelHost.onLoaded() {

declare(
VariableDeclaration(
name = ::sscCollection.name,
value = sscCollection,
isMutable = false,
type = typeOf<MutableSet<JavaStreamingContext>>(),
)
)

val _0 = execute("""%dumpClassesForSpark""")

@Language("kts")
val _1 = listOf(
"""
@JvmOverloads
fun withSparkStreaming(
batchDuration: Duration = Durations.seconds(1L),
checkpointPath: String? = null,
hadoopConf: Configuration = SparkHadoopUtil.get().conf(),
createOnError: Boolean = false,
props: Map<String, Any> = emptyMap(),
master: String = SparkConf().get("spark.master", "local[*]"),
appName: String = "Kotlin Spark Sample",
timeout: Long = -1L,
startStreamingContext: Boolean = true,
func: KSparkStreamingSession.() -> Unit,
) {

// will only be set when a new context is created
var kSparkStreamingSession: KSparkStreamingSession? = null

val creatingFunc = {
val sc = SparkConf()
.setAppName(appName)
.setMaster(master)
.setAll(
props
.map { (key, value) -> key X value.toString() }
.asScalaIterable()
)

val ssc = JavaStreamingContext(sc, batchDuration)
ssc.checkpoint(checkpointPath)

kSparkStreamingSession = KSparkStreamingSession(ssc)
func(kSparkStreamingSession!!)

ssc
}

val ssc = when {
checkpointPath != null ->
JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError)

else -> creatingFunc()
}
sscCollection += ssc

if (startStreamingContext) {
ssc.start()
kSparkStreamingSession?.invokeRunAfterStart()
}
ssc.awaitTerminationOrTimeout(timeout)
ssc.stop()
}
""".trimIndent(),
"""
println("To start a spark streaming session, simply use `withSparkStreaming { }` inside a cell. To use Spark normally, use `withSpark { }` in a cell, or use `%use spark` to start a Spark session for the whole notebook.")""".trimIndent(),
).map(::execute)
}

override fun KotlinKernelHost.onShutdown() = Unit
private fun cleanUp(e: Throwable): String {
while (sscCollection.isNotEmpty())
sscCollection.first().let {
while (it.state != StreamingContextState.STOPPED) {
try {
it.stop(true, true)
} catch (_: Exception) {
}
}
sscCollection.remove(it)
}

return "Spark streams cleaned up. Cause: $e"
}

override fun Builder.onLoadedAlsoDo() {
renderThrowable<IllegalMonitorStateException> {
cleanUp(it)
}
}

override fun KotlinKernelHost.afterCellExecution(snippetInstance: Any, result: FieldValue) = Unit
override fun KotlinKernelHost.onInterrupt() {
println(
cleanUp(InterruptedException("Kernel was interrupted."))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.kotest.matchers.string.shouldContain
import io.kotest.matchers.types.shouldBeInstanceOf
import jupyter.kotlin.DependsOn
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.intellij.lang.annotations.Language
import org.jetbrains.kotlinx.jupyter.EvalRequestData
import org.jetbrains.kotlinx.jupyter.ReplForJupyter
Expand Down Expand Up @@ -263,6 +264,14 @@ class JupyterStreamingTests : ShouldSpec({
context("Jupyter") {
withRepl {

// For when onInterrupt is implemented in the Jupyter kernel
should("Have sscCollection instance") {

@Language("kts")
val sscCollection = exec("""sscCollection""")
sscCollection as? MutableSet<JavaStreamingContext> shouldNotBe null
}

should("Not have spark instance") {
shouldThrowAny {
@Language("kts")
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<kotest-extensions-allure.version>1.1.0</kotest-extensions-allure.version>
<kotest-extensions-testcontainers.version>1.3.1</kotest-extensions-testcontainers.version>
<kotest.version>5.2.3</kotest.version>
<kotlin-jupyter-api.version>0.11.0-83</kotlin-jupyter-api.version>
<kotlin-jupyter-api.version>0.11.0-95</kotlin-jupyter-api.version>
<kotlin.version>1.6.21</kotlin.version>
<kotlinx.html.version>0.7.5</kotlinx.html.version>
<spark3.version>3.2.1</spark3.version>
Expand Down