Skip to content
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
case JobFailed(e: Exception) => scala.util.Failure(e)
}
}

/** Get the corresponding job id for this action. */
def jobId = jobWaiter.jobId
}


Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
Expand Down Expand Up @@ -574,4 +574,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

def name(): String = rdd.name

/**
* :: Experimental ::
* The asynchronous version of the foreach action.
*
* @param f the function to apply to all the elements of the RDD
* @return a FutureAction for the action
*/
@Experimental
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add javadoc for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we mark this as experimental in Java?

I don't know if we have a way to mark stuff as experimental, but maybe put this in the javadoc:

* THIS IS AN EXPERIMENTAL API THAT MIGHT CHANGE IN THE FUTURE.

@pwendell thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala-land the javadocs start like this:

 * :: Experimental ::

Perhaps just prepend that to the first line in the javadoc (so the summary contains both the short description and that it's experimental).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin @lirui-intel Make it consistent with the other code where we do this. There is no need to have a big sentence with caps. The ::Experimental:: string along with the @Experimental annotation is what we do. Look in other Java code that has this.

import org.apache.spark.SparkContext._
rdd.foreachAsync(x => f.call(x))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Applies a function f to all elements of this RDD.
*/
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, _.foreach(f), Range(0, self.partitions.size),
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
(index, data) => Unit, Unit)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package org.apache.spark.scheduler
*/
private[spark] class JobWaiter[T](
dagScheduler: DAGScheduler,
jobId: Int,
val jobId: Int,
totalTasks: Int,
resultHandler: (Int, T) => Unit)
extends JobListener {
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ object MimaExcludes {
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
// Should probably mark this as Experimental
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
// them, and use the following to tell Mima to not care about them.
Expand Down