Skip to content

Conversation

@HyukjinKwon
Copy link
Member

What changes were proposed in this pull request?

This PR adds a configuration to control the fallback of Arrow optimization for toPandas and createDataFrame with Pandas DataFrame.

How was this patch tested?

Manually tested and unit tests added.

You can test this by:

createDataFrame

spark.conf.set("spark.sql.execution.arrow.enabled", False)
pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark.createDataFrame(pdf, "a: map<string, int>")
spark.conf.set("spark.sql.execution.arrow.enabled", False)
pdf = spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
spark.createDataFrame(pdf, "a: map<string, int>")

toPandas

spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", True)
spark.createDataFrame([[{'a': 1}]]).toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", True)
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", False)
spark.createDataFrame([[{'a': 1}]]).toPandas()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, the change from here is due to removed else: block.

@gatorsmile
Copy link
Member

cc @ueshin

@SparkQA
Copy link

SparkQA commented Feb 26, 2018

Test build #87673 has finished for PR 20678 at commit ff9d38b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2018

Test build #87674 has finished for PR 20678 at commit 7f87d25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great @HyukjinKwon , thanks for doing it! I just had a comment regarding testing.

buildConf("spark.sql.execution.arrow.fallback.enabled")
.doc("When true, the optimization by 'spark.sql.execution.arrow.enabled' " +
"could be disabled when it is unable to be used, and fallback to " +
"non-optimization.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion: "When true, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically to non-optimized implementations if an error occurs."

with self.assertRaisesRegexp(Exception, 'Unsupported type'):
df.toPandas()
@contextmanager
def arrow_fallback(self, enabled):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to disable fallback for all the tests on setup/teardown. That way if something goes wrong elsewhere, the tests won't start passing due to falling back. For the test where it is enabled, you could do that explicitly. What do you think?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, makes sense. Will give a shot.

BTW, while we are here, I was thinking of adding a more generalized version of an util like arrow_fallback to reduce configuration specific codes in the test scope but was hesitant because this approach is new to PySpark. WDTY? I will do another PR for this cleanup if we all feel in the same way.

@ueshin, would you have some input for ^ too?

"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n"
" %s\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine to move this line to the previous to make it a little more compact, but up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem at all.

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

dtype = {}
if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "false") \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the same default value "true" as the default value defined in SQLConf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, this was my mistake during testing by multiple combinations. Will fix it.

# Fallback to create DataFrame without arrow if raise some exception
from pyspark.util import _exception_message

if self.conf.get("spark.sql.execution.arrow.fallback.enabled", "false") \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))

def test_toPandas_fallback_disabled(self):
with self.sql_conf("spark.sql.execution.arrow.fallback.enabled", False):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ueshin and @BryanCutler, do you guys like this idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems good, but how about using dict for setting multiple configs at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was thinking that too. I took a quick look for the rest of tests and seems we are fine with a single pair for now. Will fix it as so in place in the future if you are okay with that too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea! +1 on using a dict

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it for using a dict here soon.

"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed unexpectedly:\n"
" %s" % _exception_message(e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to mention fallback mode in the message like above?

## Upgrading From Spark SQL 2.3 to 2.4

- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only in migration section, I think we should also document this config in the section like PySpark Usage Guide for Pandas with Apache Arrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, added.

Copy link
Member

@felixcheung felixcheung Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which can be switched by -> which can be switched off by or which can be switched off with or which can be turned off with

msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toPandas attempted Arrow optimization because... repeats three times here, maybe we can dedup it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm ... I tried to like make a "toPandas attempted Arrow optimization because ... %s" and reuse it but seems a little bit overkill.

.booleanConf
.createWithDefault(false)

val ARROW_FALLBACK_ENABLE =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ARROW_FALLBACK_ENABLED instead of ARROW_FALLBACK_ENABLE?

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87696 has finished for PR 20678 at commit cfb08a1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87695 has finished for PR 20678 at commit 7641fd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87719 has finished for PR 20678 at commit 229a5f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

timezone = None

if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
should_fallback = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable name is a little confusing to me while I'm tracing the code. How about "use_arrow" and swap the meanings? Because right now if a user doesn't have arrow enabled we skip the arrow conversion because of the value of should_fallback which seems.... odd.

"'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
raise RuntimeError(msg)

if not should_fallback:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I'm tracing the logic correctly, if arrow optimizations are enabled and there is an exception parsing the schema and we don't have fall back enabled we go down this code path or if we don't have arrow enabled we also go down this code path? It might make sense to add a comment here with what the intended times to go down this path are?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but there's one more - we fallback if PyArrow is not installed (or version is different). Will add some comments to make this easier to read.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a useful improvement. I've got a few questions as well :)

the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.

In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' will fallback automatically
to non-optimized implementations if an error occurs. This can be controlled by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to be clear that we only do this if an error occurs in schema parsing, not any error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to rephrase this doc a bit. The point I was trying to make in this fallback (for now) was, to only do the fallback before the actual distributed computation within Spark.

"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed unexpectedly:\n %s\n"
"Note that 'spark.sql.execution.arrow.fallback.enabled' does "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 good job having this explanation in the exception

@HyukjinKwon
Copy link
Member Author

Will try to clean up soon.

def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE)
def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLED)

def arrowFallbackEnable: Boolean = getConf(ARROW_FALLBACK_ENABLED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Have we used this arrowFallbackEnable definition?

def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)

def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE)
def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually seems we don't use arrowEnable too.

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87785 has finished for PR 20678 at commit ed30c20.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

gentle ping, I believe this is ready for another look.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for a nit.

raise RuntimeError(msg)

# Try to use Arrow optimization when the schema is supported and the required version
# of PyArrow is found, if 'spark.sql.execution.arrow.fallback.enabled' is enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.execution.arrow.enabled instead of spark.sql.execution.arrow.fallback.enabled?

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))

def test_toPandas_fallback_disabled(self):
with self.sql_conf({"spark.sql.execution.arrow.fallback.enabled": False}):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still want this since it is disabled in setUpClass? It doesn't hurt to have it, but just thought I'd ask

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm .. yup. I don't feel strongly. Will remove it out.

@SparkQA
Copy link

SparkQA commented Mar 5, 2018

Test build #87962 has finished for PR 20678 at commit af60cb7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 6, 2018

Test build #88007 has finished for PR 20678 at commit b5bea82.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Mar 6, 2018

Test build #88016 has finished for PR 20678 at commit b5bea82.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is disabled by default.

In addition, optimizations enabled by 'spark.sql.execution.arrow.enabled' could fallback automatically
to non-optimized implementations if an error occurs before the actual computation within Spark.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor nit: non-optimized implementations --> non-Arrow optimization implementation

this matches the description in the paragraph below

## Upgrading From Spark SQL 2.3 to 2.4

- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched by `spark.sql.execution.arrow.fallback.enabled`.
Copy link
Member

@felixcheung felixcheung Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which can be switched by -> which can be switched off by or which can be switched off with or which can be turned off with

@SparkQA
Copy link

SparkQA commented Mar 8, 2018

Test build #88065 has finished for PR 20678 at commit 4ccaa81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

@asfgit asfgit closed this in d6632d1 Mar 8, 2018
@HyukjinKwon HyukjinKwon deleted the SPARK-23380-conf branch October 16, 2018 12:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants