Skip to content

Commit f8bf2b0

Browse files
committed
[SC-5628] Make sure commit protocol confs can be set via spark.*
## What changes were proposed in this pull request? Spark confs need to be prefixed by spark.* in order to be set via the config UI. ## How was this patch tested? Tested manually. Author: Eric Liang <[email protected]> Closes apache#177 from ericl/fix-conf.
1 parent 74d1d9e commit f8bf2b0

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ object DatabricksAtomicReadProtocol extends Logging {
5151
def filterDirectoryListing(
5252
fs: FileSystem, dir: Path, initialFiles: Seq[FileStatus]): Seq[FileStatus] = {
5353
// we use SparkEnv for this escape-hatch flag since this may be called on executors
54-
if (!SparkEnv.get.conf.getBoolean("com.databricks.sql.enableFilterUncommitted", true)) {
54+
if (!SparkEnv.get.conf.getBoolean("spark.databricks.sql.enableFilterUncommitted", true)) {
5555
return initialFiles
5656
}
5757

@@ -151,7 +151,7 @@ object DatabricksAtomicReadProtocol extends Logging {
151151

152152
// Optimization: can assume the list request was atomic if the files have not changed recently.
153153
val horizonMillis = SparkEnv.get.conf.getLong(
154-
"com.databricks.sql.writeReorderingHorizonMillis", 5 * 60 * 1000)
154+
"spark.databricks.sql.writeReorderingHorizonMillis", 5 * 60 * 1000)
155155

156156
if ((state.missingMarkers.nonEmpty || state.missingDataFiles.nonEmpty) &&
157157
state.lastModified > clock.getTimeMillis - horizonMillis) {
@@ -235,7 +235,7 @@ object DatabricksAtomicReadProtocol extends Logging {
235235
case NonFatal(e) =>
236236
// we use SparkEnv for this escape-hatch flag since this may be called on executors
237237
if (SparkEnv.get.conf.getBoolean(
238-
"com.databricks.sql.ignoreCorruptCommitMarkers", false)) {
238+
"spark.databricks.sql.ignoreCorruptCommitMarkers", false)) {
239239
logWarning("Failed to read job commit marker: " + stat, e)
240240
corruptCommitMarkers += txnId
241241
} else {

sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
3737
create(dir, "part-r-00001-tid-77777-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv")
3838
create(dir, "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv")
3939
try {
40-
SparkEnv.get.conf.set("com.databricks.sql.enableFilterUncommitted", "false")
40+
SparkEnv.get.conf.set("spark.databricks.sql.enableFilterUncommitted", "false")
4141
assert(spark.read.csv(dir.getAbsolutePath).count == 2)
4242
assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2)
4343
} finally {
44-
SparkEnv.get.conf.remove("com.databricks.sql.enableFilterUncommitted")
44+
SparkEnv.get.conf.remove("spark.databricks.sql.enableFilterUncommitted")
4545
}
4646
}
4747
}
@@ -171,10 +171,10 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
171171
}
172172
assert(error.getMessage.contains("Failed to read job commit marker"))
173173
try {
174-
SparkEnv.get.conf.set("com.databricks.sql.ignoreCorruptCommitMarkers", "true")
174+
SparkEnv.get.conf.set("spark.databricks.sql.ignoreCorruptCommitMarkers", "true")
175175
assert(spark.read.csv(dir.getAbsolutePath).count == 1)
176176
} finally {
177-
SparkEnv.get.conf.remove("com.databricks.sql.ignoreCorruptCommitMarkers")
177+
SparkEnv.get.conf.remove("spark.databricks.sql.ignoreCorruptCommitMarkers")
178178
}
179179
}
180180
}

0 commit comments

Comments
 (0)