Skip to content

Commit ab5cd2e

Browse files
committed
separate configs for cartesian product operator
1 parent c4c7145 commit ab5cd2e

File tree

4 files changed

+26
-12
lines changed

4 files changed

+26
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,14 @@ object SQLConf {
873873
.intConf
874874
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
875875

876+
val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
877+
buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
878+
.internal()
879+
.doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
880+
"product operator")
881+
.intConf
882+
.createWithDefault(4096)
883+
876884
val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
877885
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
878886
.internal()
@@ -1152,15 +1160,18 @@ class SQLConf extends Serializable with Logging {
11521160

11531161
def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
11541162

1163+
def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
1164+
11551165
def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
11561166

1157-
def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
1167+
def sortMergeJoinExecBufferInMemoryThreshold: Int =
1168+
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
11581169

11591170
def sortMergeJoinExecBufferSpillThreshold: Int =
11601171
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)
11611172

1162-
def sortMergeJoinExecBufferInMemoryThreshold: Int =
1163-
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
1173+
def cartesianProductExecBufferInMemoryThreshold: Int =
1174+
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
11641175

11651176
def cartesianProductExecBufferSpillThreshold: Int =
11661177
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)

sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,6 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
6464
numRowsSpillThreshold)
6565
}
6666

67-
def this(numRowsSpillThreshold: Int) {
68-
this(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
69-
}
70-
7167
private val initialSizeOfInMemoryBuffer =
7268
Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)
7369

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ class UnsafeCartesianRDD(
3535
left : RDD[UnsafeRow],
3636
right : RDD[UnsafeRow],
3737
numFieldsOfRight: Int,
38+
inMemoryBufferThreshold: Int,
3839
spillThreshold: Int)
3940
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
4041

4142
override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
42-
val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
43+
val rowArray = new ExternalAppendOnlyUnsafeRowArray(inMemoryBufferThreshold, spillThreshold)
4344

4445
val partition = split.asInstanceOf[CartesianPartition]
4546
rdd2.iterator(partition.s2, context).foreach(rowArray.add)
@@ -71,9 +72,12 @@ case class CartesianProductExec(
7172
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
7273
val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]
7374

74-
val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold
75-
76-
val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
75+
val pair = new UnsafeCartesianRDD(
76+
leftResults,
77+
rightResults,
78+
right.output.size,
79+
sqlContext.conf.cartesianProductExecBufferInMemoryThreshold,
80+
sqlContext.conf.cartesianProductExecBufferSpillThreshold)
7781
pair.mapPartitionsWithIndexInternal { (index, iter) =>
7882
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
7983
val filtered = if (condition.isDefined) {

sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
6767
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
6868
var sum = 0L
6969
for (_ <- 0L until iterations) {
70-
val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
70+
val array = new ExternalAppendOnlyUnsafeRowArray(
71+
ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer,
72+
numSpillThreshold)
73+
7174
rows.foreach(x => array.add(x))
7275

7376
val iterator = array.generateIterator()

0 commit comments

Comments
 (0)