Skip to content

Commit 2e26335

Browse files
committed
Revert "Optimization by using radix sort if possible"
This reverts commit c692265.
1 parent c692265 commit 2e26335

File tree

1 file changed

+27
-48
lines changed

1 file changed

+27
-48
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
2929
import org.apache.spark.sql.catalyst.InternalRow
3030
import org.apache.spark.sql.catalyst.errors._
3131
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
32-
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering}
32+
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
3333
import org.apache.spark.sql.catalyst.plans.physical._
3434
import org.apache.spark.sql.execution._
3535
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
@@ -237,57 +237,36 @@ object ShuffleExchangeExec {
237237
// that case all output rows go to the same partition.
238238
val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) {
239239
rdd.mapPartitionsInternal { iter =>
240-
val schema = StructType.fromAttributes(outputAttributes)
241-
val canUseRadixSort = SQLConf.get.enableRadixSort && schema.length == 1 &&
242-
SortPrefixUtils.canSortFullyWithPrefix(schema.head)
243-
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
244-
245-
val sorter = if (canUseRadixSort) {
246-
// For better performance, enable radix sort if possible.
247-
val prefixComputer = SortPrefixUtils.createPrefixGenerator(schema)
248-
val prefixComparator = SortPrefixUtils.getPrefixComparator(schema)
249-
val ordering = GenerateOrdering.create(schema)
250-
251-
UnsafeExternalRowSorter.create(
252-
schema,
253-
ordering,
254-
prefixComparator,
255-
prefixComputer,
256-
pageSize,
257-
true)
258-
} else {
259-
val recordComparatorSupplier = new Supplier[RecordComparator] {
260-
override def get: RecordComparator = new RecordBinaryComparator()
261-
}
262-
// The comparator for comparing row hashcode, which should always be Integer.
263-
val prefixComparator = PrefixComparators.LONG
240+
val recordComparatorSupplier = new Supplier[RecordComparator] {
241+
override def get: RecordComparator = new RecordBinaryComparator()
242+
}
243+
// The comparator for comparing row hashcode, which should always be Integer.
244+
val prefixComparator = PrefixComparators.LONG
264245

265-
// The prefix computer generates row hashcode as the prefix, so we may decrease the
266-
// probability that the prefixes are equal when input rows choose column values from a
267-
// limited range.
268-
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
269-
private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
270-
override def computePrefix(row: InternalRow):
271-
UnsafeExternalRowSorter.PrefixComputer.Prefix = {
272-
// The hashcode generated from the binary form of a [[UnsafeRow]] should not
273-
// be null.
274-
result.isNull = false
275-
result.value = row.hashCode()
276-
result
277-
}
246+
// The prefix computer generates row hashcode as the prefix, so we may decrease the
247+
// probability that the prefixes are equal when input rows choose column values from a
248+
// limited range.
249+
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
250+
private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
251+
override def computePrefix(row: InternalRow):
252+
UnsafeExternalRowSorter.PrefixComputer.Prefix = {
253+
// The hashcode generated from the binary form of a [[UnsafeRow]] should not be null.
254+
result.isNull = false
255+
result.value = row.hashCode()
256+
result
278257
}
279-
280-
UnsafeExternalRowSorter.createWithRecordComparator(
281-
schema,
282-
recordComparatorSupplier,
283-
prefixComparator,
284-
prefixComputer,
285-
pageSize,
286-
// We are comparing binary here, which does not support radix sort.
287-
// See more details in SPARK-28699.
288-
false)
289258
}
259+
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
290260

261+
val sorter = UnsafeExternalRowSorter.createWithRecordComparator(
262+
StructType.fromAttributes(outputAttributes),
263+
recordComparatorSupplier,
264+
prefixComparator,
265+
prefixComputer,
266+
pageSize,
267+
// We are comparing binary here, which does not support radix sort.
268+
// See more details in SPARK-28699.
269+
false)
291270
sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
292271
}
293272
} else {

0 commit comments

Comments
 (0)