From 0cf822fdb8e1375a06dc2b7ae34ce11f3932cc9f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 3 Jan 2019 13:13:08 +0800 Subject: [PATCH 1/3] follow up pr for SPARK-25921 --- python/pyspark/taskcontext.py | 14 +++--- python/pyspark/tests/test_taskcontext.py | 55 +++++++++++++++++------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index 98b505c9046b..4298996b9738 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -48,10 +48,6 @@ def __new__(cls): cls._taskContext = taskContext = object.__new__(cls) return taskContext - def __init__(self): - """Construct a TaskContext, use get instead""" - pass - @classmethod def _getOrCreate(cls): """Internal function to get or create global TaskContext.""" @@ -140,13 +136,13 @@ class BarrierTaskContext(TaskContext): _port = None _secret = None - def __init__(self): - """Construct a BarrierTaskContext, use get instead""" - pass - @classmethod def _getOrCreate(cls): - """Internal function to get or create global BarrierTaskContext.""" + """ + Internal function to get or create global BarrierTaskContext. We need to make sure + BarrierTaskContext returns here because it needs in python worker reuse scenario, + see SPARK-25921 for more details. + """ if not isinstance(cls._taskContext, BarrierTaskContext): cls._taskContext = object.__new__(cls) return cls._taskContext diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py index b3a967440a9b..220412a7fee0 100644 --- a/python/pyspark/tests/test_taskcontext.py +++ b/python/pyspark/tests/test_taskcontext.py @@ -18,7 +18,7 @@ import sys import time -from pyspark import SparkContext, TaskContext, BarrierTaskContext +from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext from pyspark.testing.utils import PySparkTestCase @@ -118,21 +118,6 @@ def context_barrier(x): times = rdd.barrier().mapPartitions(f).map(context_barrier).collect() self.assertTrue(max(times) - min(times) < 1) - def test_barrier_with_python_worker_reuse(self): - """ - Verify that BarrierTaskContext.barrier() with reused python worker. - """ - self.sc._conf.set("spark.python.work.reuse", "true") - rdd = self.sc.parallelize(range(4), 4) - # start a normal job first to start all worker - result = rdd.map(lambda x: x ** 2).collect() - self.assertEqual([0, 1, 4, 9], result) - # make sure `spark.python.work.reuse=true` - self.assertEqual(self.sc._conf.get("spark.python.work.reuse"), "true") - - # worker will be reused in this barrier job - self.test_barrier() - def test_barrier_infos(self): """ Verify that BarrierTaskContext.getTaskInfos() returns a list of all task infos in the @@ -149,6 +134,44 @@ def f(iterator): self.assertTrue(len(taskInfos[0]) == 4) +class TaskContextTestsWithWorkerReuse(PySparkTestCase): + + def setUp(self): + self._old_sys_path = list(sys.path) + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.python.worker.reuse", "true") + self.sc = SparkContext('local[2]', class_name, conf=conf) + + def test_barrier_with_python_worker_reuse(self): + """ + Regression test for SPARK-25921: verify that BarrierTaskContext.barrier() with + reused python worker. + """ + import os + self.sc._conf.set("spark.python.work.reuse", "true") + # start a normal job first to start all workers and get all worker pids + worker_pids = self.sc.parallelize(range(2), 2).map(lambda x: os.getpid()).collect() + # the worker will reuse in this barrier job + rdd = self.sc.parallelize(range(10), 2) + + def f(iterator): + yield sum(iterator) + + def context_barrier(x): + tc = BarrierTaskContext.get() + time.sleep(random.randint(1, 10)) + tc.barrier() + return (time.time(), os.getpid()) + + result = rdd.barrier().mapPartitions(f).map(context_barrier).collect() + times = map(lambda x: x[0], result) + pids = map(lambda x: x[1], result) + # check both barrier and worker reuse effect + self.assertTrue(max(times) - min(times) < 1) + for pid in pids: + self.assertTrue(pid in worker_pids) + + if __name__ == "__main__": import unittest from pyspark.tests.test_taskcontext import * From a5c20db285ad19c37b1a85f4f0e64e57f82e1c21 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 3 Jan 2019 16:17:23 +0800 Subject: [PATCH 2/3] compatible support for python3 --- python/pyspark/tests/test_taskcontext.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py index 220412a7fee0..487a64a4aab9 100644 --- a/python/pyspark/tests/test_taskcontext.py +++ b/python/pyspark/tests/test_taskcontext.py @@ -148,7 +148,6 @@ def test_barrier_with_python_worker_reuse(self): reused python worker. """ import os - self.sc._conf.set("spark.python.work.reuse", "true") # start a normal job first to start all workers and get all worker pids worker_pids = self.sc.parallelize(range(2), 2).map(lambda x: os.getpid()).collect() # the worker will reuse in this barrier job @@ -164,8 +163,8 @@ def context_barrier(x): return (time.time(), os.getpid()) result = rdd.barrier().mapPartitions(f).map(context_barrier).collect() - times = map(lambda x: x[0], result) - pids = map(lambda x: x[1], result) + times = list(map(lambda x: x[0], result)) + pids = list(map(lambda x: x[1], result)) # check both barrier and worker reuse effect self.assertTrue(max(times) - min(times) < 1) for pid in pids: From eedd445dfa5eb9d3cb42df58af0b2b2f1c8714e6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Sun, 6 Jan 2019 02:15:43 +0800 Subject: [PATCH 3/3] address comments --- python/pyspark/taskcontext.py | 4 ++-- python/pyspark/tests/test_taskcontext.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index 4298996b9738..de4b6af23666 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -140,8 +140,8 @@ class BarrierTaskContext(TaskContext): def _getOrCreate(cls): """ Internal function to get or create global BarrierTaskContext. We need to make sure - BarrierTaskContext returns here because it needs in python worker reuse scenario, - see SPARK-25921 for more details. + BarrierTaskContext is returned from here because it is needed in python worker reuse + scenario, see SPARK-25921 for more details. """ if not isinstance(cls._taskContext, BarrierTaskContext): cls._taskContext = object.__new__(cls) diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py index 487a64a4aab9..fdb5c40b78a4 100644 --- a/python/pyspark/tests/test_taskcontext.py +++ b/python/pyspark/tests/test_taskcontext.py @@ -14,9 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import os import random import sys import time +import unittest from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext from pyspark.testing.utils import PySparkTestCase @@ -134,10 +136,9 @@ def f(iterator): self.assertTrue(len(taskInfos[0]) == 4) -class TaskContextTestsWithWorkerReuse(PySparkTestCase): +class TaskContextTestsWithWorkerReuse(unittest.TestCase): def setUp(self): - self._old_sys_path = list(sys.path) class_name = self.__class__.__name__ conf = SparkConf().set("spark.python.worker.reuse", "true") self.sc = SparkContext('local[2]', class_name, conf=conf) @@ -147,7 +148,6 @@ def test_barrier_with_python_worker_reuse(self): Regression test for SPARK-25921: verify that BarrierTaskContext.barrier() with reused python worker. """ - import os # start a normal job first to start all workers and get all worker pids worker_pids = self.sc.parallelize(range(2), 2).map(lambda x: os.getpid()).collect() # the worker will reuse in this barrier job @@ -170,6 +170,9 @@ def context_barrier(x): for pid in pids: self.assertTrue(pid in worker_pids) + def tearDown(self): + self.sc.stop() + if __name__ == "__main__": import unittest