From 1256c87e0bc62518472fe3072c968c1af69a87c9 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 19 Aug 2019 16:00:21 +0800 Subject: [PATCH 1/6] [SPARK-28699][Core] Disable using radix sort for ShuffleExchangeExec in repartition case --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index fec05a76b4516..b83de0e39fb6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -242,7 +242,7 @@ object ShuffleExchangeExec { } // The comparator for comparing row hashcode, which should always be Integer. val prefixComparator = PrefixComparators.LONG - val canUseRadixSort = SQLConf.get.enableRadixSort + // The prefix computer generates row hashcode as the prefix, so we may decrease the // probability that the prefixes are equal when input rows choose column values from a // limited range. @@ -264,7 +264,7 @@ object ShuffleExchangeExec { prefixComparator, prefixComputer, pageSize, - canUseRadixSort) + false /* canUseRadixSort */) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { From 398a891cd31dd9c275717f5f37d509dec9f2de64 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 19 Aug 2019 22:44:41 +0800 Subject: [PATCH 2/6] More comments --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index b83de0e39fb6d..02f7b419bd7fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -264,6 +264,10 @@ object ShuffleExchangeExec { prefixComparator, prefixComputer, pageSize, + // As we need to compare the binary of UnsafeRow here, we can't make sure whether all + // the fields can sort fully with prefix like SortExec. So we disable radix sort here + // to avoid getting unstable sort, and result to a correctness bug. + // See more details in SPARK-28699. false /* canUseRadixSort */) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } From ec57677bf9e6136812ae0e618d8785d7e664429b Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 20 Aug 2019 10:22:13 +0800 Subject: [PATCH 3/6] comment address --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 02f7b419bd7fc..3f4a234b2e51e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -268,7 +268,7 @@ object ShuffleExchangeExec { // the fields can sort fully with prefix like SortExec. So we disable radix sort here // to avoid getting unstable sort, and result to a correctness bug. // See more details in SPARK-28699. - false /* canUseRadixSort */) + false) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { From 8bf4cc82d8f355554837c0f7cbbeb4632b1596eb Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 20 Aug 2019 11:14:57 +0800 Subject: [PATCH 4/6] fix comment --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 3f4a234b2e51e..2f4c5734469f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -264,9 +264,7 @@ object ShuffleExchangeExec { prefixComparator, prefixComputer, pageSize, - // As we need to compare the binary of UnsafeRow here, we can't make sure whether all - // the fields can sort fully with prefix like SortExec. So we disable radix sort here - // to avoid getting unstable sort, and result to a correctness bug. + // We are comparing binary here, which does not support radix sort. // See more details in SPARK-28699. false) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) From c69226578573753eeba9468d9dff113a1c113c3d Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 20 Aug 2019 19:14:44 +0800 Subject: [PATCH 5/6] Optimization by using radix sort if possible --- .../exchange/ShuffleExchangeExec.scala | 75 ++++++++++++------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 2f4c5734469f8..34c41cdce51f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} @@ -237,36 +237,57 @@ object ShuffleExchangeExec { // that case all output rows go to the same partition. val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) { rdd.mapPartitionsInternal { iter => - val recordComparatorSupplier = new Supplier[RecordComparator] { - override def get: RecordComparator = new RecordBinaryComparator() - } - // The comparator for comparing row hashcode, which should always be Integer. - val prefixComparator = PrefixComparators.LONG + val schema = StructType.fromAttributes(outputAttributes) + val canUseRadixSort = SQLConf.get.enableRadixSort && schema.length == 1 && + SortPrefixUtils.canSortFullyWithPrefix(schema.head) + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes - // The prefix computer generates row hashcode as the prefix, so we may decrease the - // probability that the prefixes are equal when input rows choose column values from a - // limited range. - val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { - private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix - override def computePrefix(row: InternalRow): - UnsafeExternalRowSorter.PrefixComputer.Prefix = { - // The hashcode generated from the binary form of a [[UnsafeRow]] should not be null. - result.isNull = false - result.value = row.hashCode() - result + val sorter = if (canUseRadixSort) { + // For better performance, enable radix sort if possible. + val prefixComputer = SortPrefixUtils.createPrefixGenerator(schema) + val prefixComparator = SortPrefixUtils.getPrefixComparator(schema) + val ordering = GenerateOrdering.create(schema) + + UnsafeExternalRowSorter.create( + schema, + ordering, + prefixComparator, + prefixComputer, + pageSize, + true) + } else { + val recordComparatorSupplier = new Supplier[RecordComparator] { + override def get: RecordComparator = new RecordBinaryComparator() } + // The comparator for comparing row hashcode, which should always be Integer. + val prefixComparator = PrefixComparators.LONG + + // The prefix computer generates row hashcode as the prefix, so we may decrease the + // probability that the prefixes are equal when input rows choose column values from a + // limited range. + val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { + private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix + override def computePrefix(row: InternalRow): + UnsafeExternalRowSorter.PrefixComputer.Prefix = { + // The hashcode generated from the binary form of a [[UnsafeRow]] should not + // be null. + result.isNull = false + result.value = row.hashCode() + result + } + } + + UnsafeExternalRowSorter.createWithRecordComparator( + schema, + recordComparatorSupplier, + prefixComparator, + prefixComputer, + pageSize, + // We are comparing binary here, which does not support radix sort. + // See more details in SPARK-28699. + false) } - val pageSize = SparkEnv.get.memoryManager.pageSizeBytes - val sorter = UnsafeExternalRowSorter.createWithRecordComparator( - StructType.fromAttributes(outputAttributes), - recordComparatorSupplier, - prefixComparator, - prefixComputer, - pageSize, - // We are comparing binary here, which does not support radix sort. - // See more details in SPARK-28699. - false) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else { From 2e26335b731b8a99c167857c574f1f7fe8a4d809 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 21 Aug 2019 11:35:26 +0800 Subject: [PATCH 6/6] Revert "Optimization by using radix sort if possible" This reverts commit c69226578573753eeba9468d9dff113a1c113c3d. --- .../exchange/ShuffleExchangeExec.scala | 75 +++++++------------ 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 34c41cdce51f1..2f4c5734469f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} -import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering} +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} @@ -237,57 +237,36 @@ object ShuffleExchangeExec { // that case all output rows go to the same partition. val newRdd = if (isRoundRobin && SQLConf.get.sortBeforeRepartition) { rdd.mapPartitionsInternal { iter => - val schema = StructType.fromAttributes(outputAttributes) - val canUseRadixSort = SQLConf.get.enableRadixSort && schema.length == 1 && - SortPrefixUtils.canSortFullyWithPrefix(schema.head) - val pageSize = SparkEnv.get.memoryManager.pageSizeBytes - - val sorter = if (canUseRadixSort) { - // For better performance, enable radix sort if possible. - val prefixComputer = SortPrefixUtils.createPrefixGenerator(schema) - val prefixComparator = SortPrefixUtils.getPrefixComparator(schema) - val ordering = GenerateOrdering.create(schema) - - UnsafeExternalRowSorter.create( - schema, - ordering, - prefixComparator, - prefixComputer, - pageSize, - true) - } else { - val recordComparatorSupplier = new Supplier[RecordComparator] { - override def get: RecordComparator = new RecordBinaryComparator() - } - // The comparator for comparing row hashcode, which should always be Integer. - val prefixComparator = PrefixComparators.LONG + val recordComparatorSupplier = new Supplier[RecordComparator] { + override def get: RecordComparator = new RecordBinaryComparator() + } + // The comparator for comparing row hashcode, which should always be Integer. + val prefixComparator = PrefixComparators.LONG - // The prefix computer generates row hashcode as the prefix, so we may decrease the - // probability that the prefixes are equal when input rows choose column values from a - // limited range. - val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { - private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix - override def computePrefix(row: InternalRow): - UnsafeExternalRowSorter.PrefixComputer.Prefix = { - // The hashcode generated from the binary form of a [[UnsafeRow]] should not - // be null. - result.isNull = false - result.value = row.hashCode() - result - } + // The prefix computer generates row hashcode as the prefix, so we may decrease the + // probability that the prefixes are equal when input rows choose column values from a + // limited range. + val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { + private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix + override def computePrefix(row: InternalRow): + UnsafeExternalRowSorter.PrefixComputer.Prefix = { + // The hashcode generated from the binary form of a [[UnsafeRow]] should not be null. + result.isNull = false + result.value = row.hashCode() + result } - - UnsafeExternalRowSorter.createWithRecordComparator( - schema, - recordComparatorSupplier, - prefixComparator, - prefixComputer, - pageSize, - // We are comparing binary here, which does not support radix sort. - // See more details in SPARK-28699. - false) } + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes + val sorter = UnsafeExternalRowSorter.createWithRecordComparator( + StructType.fromAttributes(outputAttributes), + recordComparatorSupplier, + prefixComparator, + prefixComputer, + pageSize, + // We are comparing binary here, which does not support radix sort. + // See more details in SPARK-28699. + false) sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) } } else {