diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index d8a241417532e..92ef7ce313026 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -105,13 +105,29 @@ def toPandas(self): import pyarrow # Rename columns to avoid duplicated column names. tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))] - batches = self.toDF(*tmp_column_names)._collect_as_arrow() + self_destruct = self.sql_ctx._conf.arrowPySparkSelfDestructEnabled() + batches = self.toDF(*tmp_column_names)._collect_as_arrow( + split_batches=self_destruct) if len(batches) > 0: table = pyarrow.Table.from_batches(batches) + # Ensure only the table has a reference to the batches, so that + # self_destruct (if enabled) is effective + del batches # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type # values, but we should use datetime.date to match the behavior with when # Arrow optimization is disabled. - pdf = table.to_pandas(date_as_object=True) + pandas_options = {'date_as_object': True} + if self_destruct: + # Configure PyArrow to use as little memory as possible: + # self_destruct - free columns as they are converted + # split_blocks - create a separate Pandas block for each column + # use_threads - convert one column at a time + pandas_options.update({ + 'self_destruct': True, + 'split_blocks': True, + 'use_threads': False, + }) + pdf = table.to_pandas(**pandas_options) # Rename back to the original column names. pdf.columns = self.columns for field in self.schema: @@ -225,11 +241,16 @@ def _to_corrected_pandas_type(dt): else: return None - def _collect_as_arrow(self): + def _collect_as_arrow(self, split_batches=False): """ Returns all records as a list of ArrowRecordBatches, pyarrow must be installed and available on driver and worker Python environments. This is an experimental feature. + + :param split_batches: split batches such that each column is in its own allocation, so + that the selfDestruct optimization is effective; default False. + + .. note:: Experimental. """ from pyspark.sql.dataframe import DataFrame @@ -240,7 +261,26 @@ def _collect_as_arrow(self): # Collect list of un-ordered batches where last element is a list of correct order indices try: - results = list(_load_from_socket((port, auth_secret), ArrowCollectSerializer())) + batch_stream = _load_from_socket((port, auth_secret), ArrowCollectSerializer()) + if split_batches: + # When spark.sql.execution.arrow.pyspark.selfDestruct.enabled, ensure + # each column in each record batch is contained in its own allocation. + # Otherwise, selfDestruct does nothing; it frees each column as its + # converted, but each column will actually be a list of slices of record + # batches, and so no memory is actually freed until all columns are + # converted. + import pyarrow as pa + results = [] + for batch_or_indices in batch_stream: + if isinstance(batch_or_indices, pa.RecordBatch): + batch_or_indices = pa.RecordBatch.from_arrays([ + # This call actually reallocates the array + pa.concat_arrays([array]) + for array in batch_or_indices + ], schema=batch_or_indices.schema) + results.append(batch_or_indices) + else: + results = list(batch_stream) finally: # Join serving thread and raise any exceptions from collectAsArrowToPython jsocket_auth_server.getResult() diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 938e67f28ee7e..184360749fbe5 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -25,7 +25,7 @@ from pyspark import SparkContext, SparkConf from pyspark.sql import Row, SparkSession -from pyspark.sql.functions import udf +from pyspark.sql.functions import rand, udf from pyspark.sql.types import StructType, StringType, IntegerType, LongType, \ FloatType, DoubleType, DecimalType, DateType, TimestampType, BinaryType, StructField, \ ArrayType, NullType @@ -196,6 +196,37 @@ def test_pandas_round_trip(self): pdf_arrow = df.toPandas() assert_frame_equal(pdf_arrow, pdf) + def test_pandas_self_destruct(self): + import pyarrow as pa + rows = 2 ** 10 + cols = 4 + expected_bytes = rows * cols * 8 + df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)]) + # Test the self_destruct behavior by testing _collect_as_arrow directly + allocation_before = pa.total_allocated_bytes() + batches = df._collect_as_arrow(split_batches=True) + table = pa.Table.from_batches(batches) + del batches + pdf_split = table.to_pandas(self_destruct=True, split_blocks=True, use_threads=False) + allocation_after = pa.total_allocated_bytes() + difference = allocation_after - allocation_before + # Should be around 1x the data size (table should not hold on to any memory) + self.assertGreaterEqual(difference, 0.9 * expected_bytes) + self.assertLessEqual(difference, 1.1 * expected_bytes) + + with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": False}): + no_self_destruct_pdf = df.toPandas() + # Note while memory usage is 2x data size here (both table and pdf hold on to + # memory), in this case Arrow still only tracks 1x worth of memory (since the + # batches are not allocated by Arrow in this case), so we can't make any + # assertions here + + with self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}): + self_destruct_pdf = df.toPandas() + + assert_frame_equal(pdf_split, no_self_destruct_pdf) + assert_frame_equal(pdf_split, self_destruct_pdf) + def test_filtered_frame(self): df = self.spark.range(3).toDF("i") pdf = df.filter("i < 0").toPandas() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 25f90e8f25f4a..c48ad48c0a013 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2023,6 +2023,17 @@ object SQLConf { .version("3.0.0") .fallbackConf(ARROW_EXECUTION_ENABLED) + val ARROW_PYSPARK_SELF_DESTRUCT_ENABLED = + buildConf("spark.sql.execution.arrow.pyspark.selfDestruct.enabled") + .doc("When true, make use of Apache Arrow's self-destruct and split-blocks options " + + "for columnar data transfers in PySpark, when converting from Arrow to Pandas. " + + "This reduces memory usage at the cost of some CPU time. " + + "This optimization applies to: pyspark.sql.DataFrame.toPandas " + + "when 'spark.sql.execution.arrow.pyspark.enabled' is set.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val PYSPARK_JVM_STACKTRACE_ENABLED = buildConf("spark.sql.pyspark.jvmStacktrace.enabled") .doc("When true, it shows the JVM stacktrace in the user-facing PySpark exception " + @@ -3577,6 +3588,8 @@ class SQLConf extends Serializable with Logging { def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED) + def arrowPySparkSelfDestructEnabled: Boolean = getConf(ARROW_PYSPARK_SELF_DESTRUCT_ENABLED) + def pysparkJVMStacktraceEnabled: Boolean = getConf(PYSPARK_JVM_STACKTRACE_ENABLED) def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)