diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala index 7eef3ced422e..4ad9ebbe36be 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala @@ -129,7 +129,9 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -142,7 +144,8 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala index 125cdf7259fe..2b204a0470d0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala @@ -94,7 +94,9 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] { val rMetadata = ("class" -> instance.getClass.getName) ~ ("ratingCol" -> instance.ratingCol) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.alsModel.save(modelPath) } @@ -107,7 +109,8 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] { val rMetadataPath = new Path(path, "rMetadata").toString val modelPath = new Path(path, "model").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val ratingCol = (rMetadata \ "ratingCol").extract[String] val alsModel = ALSModel.load(modelPath) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala index d4486f1b80a1..4daf0f27546b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala @@ -120,7 +120,9 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp ("size" -> instance.size.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -133,7 +135,8 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val size = (rMetadata \ "size").extract[Array[Long]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala index 992a0c18819f..12e824c0fdae 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeClassifierWrapper.scala @@ -131,7 +131,9 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -144,7 +146,8 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala index db421b5a1875..48342fc47141 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/DecisionTreeRegressorWrapper.scala @@ -114,7 +114,9 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -127,7 +129,8 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala index 635af0563da0..7e3c7ab5f2fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMClassifierWrapper.scala @@ -151,7 +151,9 @@ private[r] object FMClassifierWrapper ("features" -> instance.features.toImmutableArraySeq) ~ ("labels" -> instance.labels.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -164,7 +166,8 @@ private[r] object FMClassifierWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val labels = (rMetadata \ "labels").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala index b036a1d102d9..60792dadec5e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/FMRegressorWrapper.scala @@ -132,7 +132,9 @@ private[r] object FMRegressorWrapper val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -145,7 +147,8 @@ private[r] object FMRegressorWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala index b8151d8d9070..86c11eadf8ac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/FPGrowthWrapper.scala @@ -77,8 +77,9 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] { val rMetadataJson: String = compact(render( "class" -> instance.getClass.getName )) - - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.fpGrowthModel.save(modelPath) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala index 777191ef5e5c..5bf021ca3bd4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTClassifierWrapper.scala @@ -138,7 +138,9 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper] ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + sparkSession.createDataFrame( + Seq(Tuple1(rMetadataJson)) + ).repartition(1).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -151,7 +153,8 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper] val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala index 6e5ca47fabae..575ae2458222 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GBTRegressorWrapper.scala @@ -122,7 +122,9 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] { ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -135,7 +137,8 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] { val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala index 9a98a8b18b14..dd6e91e891d6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala @@ -113,7 +113,9 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp ("logLikelihood" -> instance.logLikelihood) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -126,7 +128,8 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val dim = (rMetadata \ "dim").extract[Int] val logLikelihood = (rMetadata \ "logLikelihood").extract[Double] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala index 60cf0631f91d..778af00acc25 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -170,7 +170,9 @@ private[r] object GeneralizedLinearRegressionWrapper ("rAic" -> instance.rAic) ~ ("rNumIterations" -> instance.rNumIterations) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -184,7 +186,8 @@ private[r] object GeneralizedLinearRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]] val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala index d4a3adea460f..c8236d7a2a46 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala @@ -99,7 +99,9 @@ private[r] object IsotonicRegressionWrapper val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -112,7 +114,8 @@ private[r] object IsotonicRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala index 78c9a15aac59..063caaee0302 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala @@ -123,7 +123,9 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] { ("size" -> instance.size.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } } @@ -136,7 +138,8 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] { val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val size = (rMetadata \ "size").extract[Array[Long]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala index 943c38178d6f..cfcd4a85ab27 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala @@ -198,7 +198,9 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] { ("logPerplexity" -> instance.logPerplexity) ~ ("vocabulary" -> instance.vocabulary.toList) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -211,7 +213,8 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] { val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val logLikelihood = (rMetadata \ "logLikelihood").extract[Double] val logPerplexity = (rMetadata \ "logPerplexity").extract[Double] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala index 96b00fab7e34..ee86c55486e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearRegressionWrapper.scala @@ -127,7 +127,9 @@ private[r] object LinearRegressionWrapper val rMetadata = ("class" -> instance.getClass.getName) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -140,7 +142,8 @@ private[r] object LinearRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala index 3645af3e5311..69e4a8ec2263 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala @@ -137,7 +137,9 @@ private[r] object LinearSVCWrapper ("features" -> instance.features.toImmutableArraySeq) ~ ("labels" -> instance.labels.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -150,7 +152,8 @@ private[r] object LinearSVCWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val labels = (rMetadata \ "labels").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala index cac3d0609b20..ff7fd6e42729 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala @@ -192,7 +192,9 @@ private[r] object LogisticRegressionWrapper ("features" -> instance.features.toImmutableArraySeq) ~ ("labels" -> instance.labels.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -205,7 +207,8 @@ private[r] object LogisticRegressionWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val features = (rMetadata \ "features").extract[Array[String]] val labels = (rMetadata \ "labels").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala index 96c588acc140..e5a6a0f0853b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala @@ -142,7 +142,9 @@ private[r] object MultilayerPerceptronClassifierWrapper val rMetadata = "class" -> instance.getClass.getName val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala index d5e8e0ef4890..bd9905d19aed 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala @@ -102,7 +102,9 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] { ("labels" -> instance.labels.toImmutableArraySeq) ~ ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) instance.pipeline.save(pipelinePath) } @@ -115,7 +117,8 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] { val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val labels = (rMetadata \ "labels").extract[Array[String]] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index 551c7514ee85..3a7539e0937f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -33,7 +33,8 @@ private[r] object RWrappers extends MLReader[Object] { override def load(path: String): Object = { implicit val format = DefaultFormats val rMetadataPath = new Path(path, "rMetadata").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val className = (rMetadata \ "class").extract[String] className match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala index 7c4175a6c591..1c1bd046de62 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestClassifierWrapper.scala @@ -141,7 +141,10 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) + instance.pipeline.save(pipelinePath) } } @@ -154,7 +157,8 @@ private[r] object RandomForestClassifierWrapper extends MLReadable[RandomForestC val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]] diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala index 911571cac77d..700989e34e45 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RandomForestRegressorWrapper.scala @@ -124,7 +124,10 @@ private[r] object RandomForestRegressorWrapper extends MLReadable[RandomForestRe ("features" -> instance.features.toImmutableArraySeq) val rMetadataJson: String = compact(render(rMetadata)) - sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + // Note that we should write single file. If there are more than one row + // it produces more partitions. + sparkSession.createDataFrame(Seq(Tuple1(rMetadataJson))).write.text(rMetadataPath) + instance.pipeline.save(pipelinePath) } } @@ -137,7 +140,8 @@ private[r] object RandomForestRegressorWrapper extends MLReadable[RandomForestRe val pipelinePath = new Path(path, "pipeline").toString val pipeline = PipelineModel.load(pipelinePath) - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadataStr = sparkSession.read.text(rMetadataPath) + .first().getString(0) val rMetadata = parse(rMetadataStr) val formula = (rMetadata \ "formula").extract[String] val features = (rMetadata \ "features").extract[Array[String]]