Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Sep 1, 2018

What changes were proposed in this pull request?

In both ORC data sources, createFilter function has exponential time complexity due to its skewed filter tree generation. This PR aims to improve it by using new buildTree function.

REPRODUCE

// Create and read 1 row table with 1000 columns
sql("set spark.sql.orc.filterPushdown=true")
val selectExpr = (1 to 1000).map(i => s"id c$i")
spark.range(1).selectExpr(selectExpr: _*).write.mode("overwrite").orc("/tmp/orc")
print(s"With 0 filters, ")
spark.time(spark.read.orc("/tmp/orc").count)

// Increase the number of filters
(20 to 30).foreach { width =>
  val whereExpr = (1 to width).map(i => s"c$i is not null").mkString(" and ")
  print(s"With $width filters, ")
  spark.time(spark.read.orc("/tmp/orc").where(whereExpr).count)
}

RESULT

With 0 filters, Time taken: 653 ms                                              
With 20 filters, Time taken: 962 ms
With 21 filters, Time taken: 1282 ms
With 22 filters, Time taken: 1982 ms
With 23 filters, Time taken: 3855 ms
With 24 filters, Time taken: 6719 ms
With 25 filters, Time taken: 12669 ms
With 26 filters, Time taken: 25032 ms
With 27 filters, Time taken: 49585 ms
With 28 filters, Time taken: 98980 ms    // over 1 min 38 seconds
With 29 filters, Time taken: 198368 ms   // over 3 mins
With 30 filters, Time taken: 393744 ms   // over 6 mins

AFTER THIS PR

With 0 filters, Time taken: 774 ms
With 20 filters, Time taken: 601 ms
With 21 filters, Time taken: 399 ms
With 22 filters, Time taken: 679 ms
With 23 filters, Time taken: 363 ms
With 24 filters, Time taken: 342 ms
With 25 filters, Time taken: 336 ms
With 26 filters, Time taken: 352 ms
With 27 filters, Time taken: 322 ms
With 28 filters, Time taken: 302 ms
With 29 filters, Time taken: 307 ms
With 30 filters, Time taken: 301 ms

How was this patch tested?

Pass the Jenkins with newly added test cases.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-25306][SQL] Use cache to speed up createFilter [SPARK-25306][SQL] Use cache to speed up createFilter in ORC Sep 1, 2018
@SparkQA
Copy link

SparkQA commented Sep 2, 2018

Test build #95583 has finished for PR 22313 at commit ac06b0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType])
  • case class FilterWithTypeMap(filter: Filter, typeMap: Map[String, DataType])

@dongjoon-hyun
Copy link
Member Author

Could you review this PR, @gatorsmile and @cloud-fan ?

}
})

private def getOrBuildSearchArgumentWithNewBuilder(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a little question about is any possible to reuse code with https://github.com/apache/spark/pull/22313/files#diff-224b8cbedf286ecbfdd092d1e2e2f237R61?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking . This already reuses cacheExpireTimeout.

For the cache value, SearchArgument, SearchArgumentFactory and Builder are different classes. (They only share the same names.)

  • Here, they comes from org.apache.hadoop.hive.ql.io.sarg.*.
  • There, they comes from org.apache.orc.storage.ql.io.sarg.*.

The only exception I made is FilterWithTypeMap. I wanted to keep them separately since it's also related to cache key.

@kiszk
Copy link
Member

kiszk commented Sep 2, 2018

General question: Why do we use time instead of entry size to control cache? I am neutral on this decision. I would like to hear the reason of this decision.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 2, 2018

Thank you for review, @kiszk .

First, I don't want to hold the memory up after query completion. If we do, it will be a regression. So, I wanted time first.

Second, It's difficult to estimate the enough limit for the number of filters.

  • As we know, in several codegen JVM limit issues, there are several attempts to generate a single complex query for wide tables (thousands of columns).
  • Spark's optimizer like InferFiltersFromConstraints adds more constraints like 'IsNotNull(col1)`. Usually, the number of filters becomes double here.
  • Also, it's not a good design if we need to increase this limitation whenever we add a new SQL optimizer like InferFiltersFromConstraints.
  • If the limit is too high, we waste the memory. If the limit is small, the eviction will bite us again.

In short, time was enough and the simplest for this purpose.

if (cacheExpireTimeout > 0) {
searchArgumentCache.get(FilterWithTypeMap(expression, dataTypeMap))
} else {
buildSearchArgument(dataTypeMap, expression, SearchArgumentFactory.newBuilder())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we set timeout to zero on the cache, the loaded element can be removed immediately. Maybe we don't need to check timeout like this and we can simplify the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. It's possible. But, if we create a Guava loading cache and pass through all the cache management logic in Guava, it means a more overhead than this PR. In this PR, spark.sql.orc.cache.sarg.timeout=0 means not creating the loading cache at all.

if (cacheExpireTimeout > 0) {
// Build in a bottom-up manner
getOrBuildSearchArgumentWithNewBuilder(dataTypeMap, newFilter)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to cache all sub filters? Don't we just need to cache the final conjunction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final conjunction? All sub function results will be cached in the end.

@cloud-fan
Copy link
Contributor

Do you know why createFilter function has exponential time complexity? Let's make sure the algorithm is good before adding the cache.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 3, 2018

Thank you for review and advice, @cloud-fan . It turns out that my initial assessment is not enough.

First of all, from the beginning, SPARK-2883 is designed as a recursive function like the following. Please see tryLeft and tryRight. It's a pure computation to check if it succeeds. There is no reuse here. So, I tried to cache the first two tryLeft and tryRight operations since they can be re-used.

val tryLeft = buildSearchArgument(left, newBuilder)
val tryRight = buildSearchArgument(right, newBuilder)
val conjunction = for {
  _ <- tryLeft
  _ <- tryRight
  lhs <- buildSearchArgument(left, builder.startAnd())
  rhs <- buildSearchArgument(right, lhs)
} yield rhs.end()

However, before that, createFilter generates the target tree with reduceOption(And) as a deeply skewed tree. That was the root cause. I'll update this PR.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-25306][SQL] Use cache to speed up createFilter in ORC [SPARK-25306][SQL] Avoid skewed filter trees to speed up createFilter in ORC Sep 3, 2018
@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95637 has finished for PR 22313 at commit 4acbaf8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val schema = new StructType(Array(StructField("a", IntegerType, nullable = true)))
val filters = (1 to 2000).map(LessThan("a", _)).toArray[Filter]
failAfter(2 seconds) {
OrcFilters.createFilter(schema, filters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks tricky... It's a bad practice to assume some code will return in a certain time. Can we just add a microbenchmark for it?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

  1. Something like the test code in the PR description? And marked as ignore(...) instead of test(...) here?
  2. Or, do you want another test case in FilterPushdownBenchmark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll choose (2), @cloud-fan .

for {
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(org.apache.spark.sql.sources.And)
conjunction <- buildTree(convertibleFilters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does parquet has the same problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parquet, this is done as

filters
  .flatMap(ParquetFilters.createFilter(requiredSchema, _))
  .reduceOption(FilterApi.and)

can we follow it?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first question, I don't think Parquet has the same issue because Parquet uses canMakeFilterOn while ORC is trying to build a full result (with a fresh builder) to check if it's okay or not.

For the second question,

  1. in ORC, we did the first half(flatMap) to compute convertibleFilters, but we can change it with filters.filter. I'll update like that
val convertibleFilters = for {
    filter <- filters
    _ <- buildSearchArgument(dataTypeMap, filter, SearchArgumentFactory.newBuilder())
} yield filter
  1. The second half reduceOption(FilterApi.and) was the original ORC code which generated a skewed tree having exponential time complexity. We need to use buildTree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, Parquet has another issue here due to .reduceOption(FilterApi.and). When I make a benchmark, Parquet seems to be unable to handle 1000 filters, @cloud-fan .

withTempPath { dir =>
val columns = (1 to width).map(i => s"id c$i")
val df = spark.range(1).selectExpr(columns: _*)
withTempTable("orcTable", "patquetTable") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a typo, patquetTable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks!

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95651 has finished for PR 22313 at commit 5c46693.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95652 has finished for PR 22313 at commit 4a372a3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

} yield builder.build()
buildTree(filters.filter(buildSearchArgument(dataTypeMap, _, newBuilder).isDefined))
.flatMap(buildSearchArgument(dataTypeMap, _, newBuilder))
.map(_.build)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see what you mean now. Can we restore to the previous version? That seems better. Sorry for the back and forth!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. No problem, @cloud-fan . :)

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95658 has finished for PR 22313 at commit 4a372a3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95665 has finished for PR 22313 at commit 3cd4443.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95669 has finished for PR 22313 at commit 3cd4443.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@dongjoon-hyun
Copy link
Member Author

The previous failures are irrelevant to this PR.

  • org.apache.spark.sql.execution.streaming.HDFSMetadataLogSuite.HDFSMetadataLog: metadata directory collision
  • org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a sbt.testing.SuiteSelector)
  • org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a sbt.testing.SuiteSelector)

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95680 has finished for PR 22313 at commit 3cd4443.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95685 has finished for PR 22313 at commit 3cd4443.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan
Copy link
Contributor

@dongjoon-hyun please also update the title of the JIRA ticket, thanks!

@asfgit asfgit closed this in 103f513 Sep 5, 2018
@dongjoon-hyun
Copy link
Member Author

Thank you, @cloud-fan . Sure. I'll update them.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-25306 branch September 5, 2018 02:36
@dongjoon-hyun
Copy link
Member Author

Also, thank you for review, @xuanyuanking, @kiszk , @viirya , @HyukjinKwon .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants