Skip to content

Commit 5028a00

Browse files
kevinyu98rxin
authored andcommitted
[SPARK-12317][SQL] Support units (m,k,g) in SQLConf
This PR is continue from previous closed PR 10314. In this PR, SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE will be taken memory string conventions as input. For example, the user can now specify 10g for SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE in SQLConf file. marmbrus srowen : Can you help review this code changes ? Thanks. Author: Kevin Yu <[email protected]> Closes #10629 from kevinyu98/spark-12317.
1 parent 28e0e50 commit 5028a00

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
2626

2727
import org.apache.spark.sql.catalyst.CatalystConf
2828
import org.apache.spark.sql.catalyst.parser.ParserConf
29+
import org.apache.spark.util.Utils
2930

3031
////////////////////////////////////////////////////////////////////////////////////////////////////
3132
// This file defines the configuration options for Spark SQL.
@@ -115,6 +116,25 @@ private[spark] object SQLConf {
115116
}
116117
}, _.toString, doc, isPublic)
117118

119+
def longMemConf(
120+
key: String,
121+
defaultValue: Option[Long] = None,
122+
doc: String = "",
123+
isPublic: Boolean = true): SQLConfEntry[Long] =
124+
SQLConfEntry(key, defaultValue, { v =>
125+
try {
126+
v.toLong
127+
} catch {
128+
case _: NumberFormatException =>
129+
try {
130+
Utils.byteStringAsBytes(v)
131+
} catch {
132+
case _: NumberFormatException =>
133+
throw new IllegalArgumentException(s"$key should be long, but was $v")
134+
}
135+
}
136+
}, _.toString, doc, isPublic)
137+
118138
def doubleConf(
119139
key: String,
120140
defaultValue: Option[Double] = None,
@@ -235,7 +255,7 @@ private[spark] object SQLConf {
235255
doc = "The default number of partitions to use when shuffling data for joins or aggregations.")
236256

237257
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
238-
longConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
258+
longMemConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize",
239259
defaultValue = Some(64 * 1024 * 1024),
240260
doc = "The target post-shuffle input size in bytes of a task.")
241261

sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,43 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
9292
}
9393
assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10")
9494
}
95+
96+
test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") {
97+
sqlContext.conf.clear()
98+
99+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100")
100+
assert(sqlContext.conf.targetPostShuffleInputSize === 100)
101+
102+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1k")
103+
assert(sqlContext.conf.targetPostShuffleInputSize === 1024)
104+
105+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1M")
106+
assert(sqlContext.conf.targetPostShuffleInputSize === 1048576)
107+
108+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "1g")
109+
assert(sqlContext.conf.targetPostShuffleInputSize === 1073741824)
110+
111+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1")
112+
assert(sqlContext.conf.targetPostShuffleInputSize === -1)
113+
114+
// Test overflow exception
115+
intercept[IllegalArgumentException] {
116+
// This value exceeds Long.MaxValue
117+
// Utils.byteStringAsBytes("90000000000g")
118+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "90000000000g")
119+
}
120+
121+
intercept[IllegalArgumentException] {
122+
// This value less than Int.MinValue
123+
// Utils.byteStringAsBytes("-90000000000g")
124+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g")
125+
}
126+
// Test invalid input
127+
intercept[IllegalArgumentException] {
128+
// This value exceeds Long.MaxValue
129+
// Utils.byteStringAsBytes("-1g")
130+
sqlContext.setConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-1g")
131+
}
132+
sqlContext.conf.clear()
133+
}
95134
}

0 commit comments

Comments
 (0)