Skip to content

Deserialization of Linear Regression Broken for Spark 2.3 #455

@tcoatale

Description

@tcoatale

It is no longer possible to use a linear regression model having been serialized and then deserialized using mleap in spark 2.3.

The following sequence of operations fails

    import ml.combust.bundle.BundleFile
    import ml.combust.mleap.spark.SparkSupport._
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.bundle.SparkBundleContext
    import resource._
    import spark.implicits._

    if (new File("/tmp/simple-spark-pipeline.zip").exists())
      new File("/tmp/simple-spark-pipeline.zip").delete()

    val trainingData = Seq((1, 0.8, 1.2), (1, 0.7, 1.0), (1, 1.2, 1.1), (1, 0.7, 2.0), (2, 1.2, 1.0))
      .toDF("feature1", "feature2", "value")

    val testingData = Seq((1, 0.8, 1.2), (1, 0.7, 1.0), (1, 1.2, 1.1), (1, 0.7, 2.0), (2, 1.2, 1.0))
      .toDF("feature1", "feature2", "value")

    val vectorAssembler = new VectorAssembler()
      .setInputCols(Array("feature1", "feature2"))
      .setOutputCol("features")

    val assembledData = vectorAssembler.transform(trainingData)

    val lr = new LinearRegression()
      .setFeaturesCol("features")
      .setLabelCol("value")

    val model = lr.fit(assembledData)

    val pipeline = new Pipeline()
      .setStages(Array(vectorAssembler, model))
      .fit(trainingData)

    // then serialize pipeline
    val sbc = SparkBundleContext().withDataset(pipeline.transform(trainingData))
    for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
      pipeline.writeBundle.save(bf)(sbc).get
    }

    import ml.combust.bundle.BundleFile
    import ml.combust.mleap.runtime.MleapSupport._
    import resource._

    val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) yield {
      bundleFile.loadMleapBundle().get
    }).opt.get
    // transform the dataframe using our pipeline
    val mleapPipeline = bundle.root

    // then serialize pipeline
    for (bundle <- managed(BundleFile("jar:file:/tmp/simple-mleap-pipeline.zip"))) {
      mleapPipeline.writeBundle.format(SerializationFormat.Json).save(bundle)
    }

    val sparkBundle = (for(bf <- managed(BundleFile("jar:file:/tmp/simple-mleap-pipeline.zip"))) yield {
      bf.loadSparkBundle().get
    }).either

    val transformer = sparkBundle.right.get.root
    transformer.transform(testingData)

with the following stacktrace:

Failed to find a default value for loss
java.util.NoSuchElementException: Failed to find a default value for loss
	at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:780)
	at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:780)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.ml.param.Params$class.getOrDefault(params.scala:779)
	at org.apache.spark.ml.PipelineStage.getOrDefault(Pipeline.scala:42)
	at org.apache.spark.ml.param.Params$class.$(params.scala:786)
	at org.apache.spark.ml.PipelineStage.$(Pipeline.scala:42)
	at org.apache.spark.ml.regression.LinearRegressionParams$class.validateAndTransformSchema(LinearRegression.scala:110)
	at org.apache.spark.ml.regression.LinearRegressionModel.validateAndTransformSchema(LinearRegression.scala:640)
	at org.apache.spark.ml.PredictionModel.transformSchema(Predictor.scala:192)
	at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:311)
	at org.apache.spark.ml.PipelineModel$$anonfun$transformSchema$5.apply(Pipeline.scala:311)
	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
	at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
	at org.apache.spark.ml.PipelineModel.transformSchema(Pipeline.scala:311)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
	at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)

In Spark 2.3.0, a new param was introduced: loss. When using a LinearRegressionModel's method transform, the method validateAndTransformSchema checks the compatibility between the solver param and the loss param, which, in our case, breaks because the deserialized model does not contain those parameters.

Quite frankly, it seems like this could be handled better by spark because the check concerns the training of the model and is only being done once the model is already trained. However, something should probably be done to make sure the export/import feature is functional for linear regression.

Please let me know what you think. I am open to contributing a PR to this issue, as long as you let me know how you would like to approach this issue.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions