Skip to content

Commit a6a189f

Browse files
committed
[SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas
1 parent f3ad32f commit a6a189f

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PandasConversionMixin(object):
3434
"""
3535

3636
@since(1.3)
37-
def toPandas(self):
37+
def toPandas(self, selfDestruct=False):
3838
"""
3939
Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``.
4040
@@ -103,10 +103,22 @@ def toPandas(self):
103103
batches = self.toDF(*tmp_column_names)._collect_as_arrow()
104104
if len(batches) > 0:
105105
table = pyarrow.Table.from_batches(batches)
106+
del batches
106107
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
107108
# values, but we should use datetime.date to match the behavior with when
108109
# Arrow optimization is disabled.
109-
pdf = table.to_pandas(date_as_object=True)
110+
pandas_options = {'date_as_object': True}
111+
if selfDestruct:
112+
# Configure PyArrow to use as little memory as possible:
113+
# self_destruct - free columns as they are converted
114+
# split_blocks - create a separate Pandas block for each column
115+
# use_threads - convert one column at a time
116+
pandas_options.update({
117+
'self_destruct': True,
118+
'split_blocks': True,
119+
'use_threads': False,
120+
})
121+
pdf = table.to_pandas(**pandas_options)
110122
# Rename back to the original column names.
111123
pdf.columns = self.columns
112124
for field in self.schema:

python/pyspark/sql/pandas/serializers.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,17 @@ def load_stream(self, stream):
9090
import pyarrow as pa
9191
reader = pa.ipc.open_stream(stream)
9292
for batch in reader:
93-
yield batch
93+
# In the case toPandas is called with selfDestruct=True,
94+
# ensure each column in each record batch is contained in
95+
# its own allocation. Otherwise, selfDestruct does
96+
# nothing; it frees each column as its converted, but each
97+
# column will actually be a list of slices of record
98+
# batches, and so no memory is actually freed until all
99+
# columns are converted.
100+
split_batch = pa.RecordBatch.from_arrays([
101+
pa.concat_arrays([array]) for array in batch
102+
], schema=batch.schema)
103+
yield split_batch
94104

95105
def __repr__(self):
96106
return "ArrowStreamSerializer"

python/pyspark/sql/tests/test_arrow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,12 @@ def test_pandas_round_trip(self):
190190
pdf_arrow = df.toPandas()
191191
assert_frame_equal(pdf_arrow, pdf)
192192

193+
def test_pandas_self_destruct(self):
194+
pdf = self.create_pandas_data_frame()
195+
df = self.spark.createDataFrame(self.data, schema=self.schema)
196+
pdf_arrow = df.toPandas(selfDestruct=True)
197+
assert_frame_equal(pdf_arrow, pdf)
198+
193199
def test_filtered_frame(self):
194200
df = self.spark.range(3).toDF("i")
195201
pdf = df.filter("i < 0").toPandas()

0 commit comments

Comments
 (0)