Skip to content

Commit bf2feec

Browse files
committed
add test case with delay
1 parent 6457e42 commit bf2feec

File tree

1 file changed

+15
-9
lines changed

1 file changed

+15
-9
lines changed

python/pyspark/sql/tests.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4925,25 +4925,31 @@ def test_timestamp_dst(self):
49254925

49264926
def test_toPandas_batch_order(self):
49274927

4928+
def delay_first_part(partition_index, iterator):
4929+
if partition_index == 0:
4930+
time.sleep(0.1)
4931+
return iterator
4932+
49284933
# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python
4929-
def run_test(num_records, num_parts, max_records):
4934+
def run_test(num_records, num_parts, max_records, use_delay=False):
49304935
df = self.spark.range(num_records, numPartitions=num_parts).toDF("a")
4936+
if use_delay:
4937+
df = df.rdd.mapPartitionsWithIndex(delay_first_part).toDF()
49314938
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
49324939
pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
49334940
self.assertPandasEqual(pdf, pdf_arrow)
49344941

49354942
cases = [
4936-
(1024, 512, 2), # Try large num partitions for good chance of not collecting in order
4937-
(512, 64, 2), # Try medium num partitions to test out of order collection
4938-
(64, 8, 2), # Try small number of partitions to test out of order collection
4939-
(64, 64, 1), # Test single batch per partition
4940-
(64, 1, 64), # Test single partition, single batch
4941-
(64, 1, 8), # Test single partition, multiple batches
4942-
(30, 7, 2), # Test different sized partitions
4943+
(1024, 512, 2), # Use large num partitions for more likely collecting out of order
4944+
(64, 8, 2, True), # Use delay in first partition to force collecting out of order
4945+
(64, 64, 1), # Test single batch per partition
4946+
(64, 1, 64), # Test single partition, single batch
4947+
(64, 1, 8), # Test single partition, multiple batches
4948+
(30, 7, 2), # Test different sized partitions
49434949
]
49444950

49454951
for case in cases:
4946-
run_test(num_records=case[0], num_parts=case[1], max_records=case[2])
4952+
run_test(*case)
49474953

49484954

49494955
class EncryptionArrowTests(ArrowTests):

0 commit comments

Comments
 (0)