Skip to content

Commit fe904e6

Browse files
committed
[SPARK-49709][CONNECT][SQL] Support ConfigEntry in the RuntimeConfig interface
### What changes were proposed in this pull request? This PR adds support for ConfigEntry to the RuntimeConfig interface. This was removed in apache#47980. ### Why are the changes needed? This functionality is used a lot by Spark libraries. Removing them caused friction, and adding them does not pollute the RuntimeConfig interface. ### Does this PR introduce _any_ user-facing change? No. This is developer API. ### How was this patch tested? I have added tests cases for Connect and Classic. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#49062 from hvanhovell/SPARK-49709. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent 4248397 commit fe904e6

File tree

12 files changed

+201
-40
lines changed

12 files changed

+201
-40
lines changed

core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala renamed to common/utils/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.util.matching.Regex
2424

2525
import org.apache.spark.SparkIllegalArgumentException
2626
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
27-
import org.apache.spark.util.Utils
27+
import org.apache.spark.util.SparkStringUtils
2828

2929
private object ConfigHelpers {
3030

@@ -47,7 +47,7 @@ private object ConfigHelpers {
4747
}
4848

4949
def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
50-
Utils.stringToSeq(str).map(converter)
50+
SparkStringUtils.stringToSeq(str).map(converter)
5151
}
5252

5353
def seqToString[T](v: Seq[T], stringConverter: T => String): String = {

core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala renamed to common/utils/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.internal.config
1919

2020
import java.util.{Map => JMap}
2121

22-
import org.apache.spark.SparkConf
23-
2422
/**
2523
* A source of configuration values.
2624
*/
@@ -47,18 +45,3 @@ private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvi
4745
override def get(key: String): Option[String] = Option(conf.get(key))
4846

4947
}
50-
51-
/**
52-
* A config provider that only reads Spark config keys.
53-
*/
54-
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
55-
56-
override def get(key: String): Option[String] = {
57-
if (key.startsWith("spark.")) {
58-
Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf))
59-
} else {
60-
None
61-
}
62-
}
63-
64-
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.util
18+
19+
trait SparkStringUtils {
20+
def stringToSeq(str: String): Seq[String] = {
21+
import org.apache.spark.util.ArrayImplicits._
22+
str.split(",").map(_.trim()).filter(_.nonEmpty).toImmutableArraySeq
23+
}
24+
}
25+
26+
object SparkStringUtils extends SparkStringUtils

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/internal/ConnectRuntimeConfig.scala

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.internal
1818

1919
import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue}
2020
import org.apache.spark.internal.Logging
21+
import org.apache.spark.internal.config.{ConfigEntry, ConfigReader, OptionalConfigEntry}
2122
import org.apache.spark.sql.RuntimeConfig
2223
import org.apache.spark.sql.connect.client.SparkConnectClient
2324

@@ -28,7 +29,7 @@ import org.apache.spark.sql.connect.client.SparkConnectClient
2829
*/
2930
class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
3031
extends RuntimeConfig
31-
with Logging {
32+
with Logging { self =>
3233

3334
/** @inheritdoc */
3435
def set(key: String, value: String): Unit = {
@@ -37,6 +38,13 @@ class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
3738
}
3839
}
3940

41+
/** @inheritdoc */
42+
override private[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = {
43+
require(entry != null, "entry cannot be null")
44+
require(value != null, s"value cannot be null for key: ${entry.key}")
45+
set(entry.key, entry.stringConverter(value))
46+
}
47+
4048
/** @inheritdoc */
4149
@throws[NoSuchElementException]("if the key is not set and there is no default value")
4250
def get(key: String): String = getOption(key).getOrElse {
@@ -45,11 +53,39 @@ class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
4553

4654
/** @inheritdoc */
4755
def get(key: String, default: String): String = {
48-
executeConfigRequestSingleValue { builder =>
49-
builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default)
56+
val kv = executeConfigRequestSinglePair { builder =>
57+
val pairsBuilder = builder.getGetWithDefaultBuilder
58+
.addPairsBuilder()
59+
.setKey(key)
60+
if (default != null) {
61+
pairsBuilder.setValue(default)
62+
}
63+
}
64+
if (kv.hasValue) {
65+
kv.getValue
66+
} else {
67+
default
5068
}
5169
}
5270

71+
/** @inheritdoc */
72+
override private[sql] def get[T](entry: ConfigEntry[T]): T = {
73+
require(entry != null, "entry cannot be null")
74+
entry.readFrom(reader)
75+
}
76+
77+
/** @inheritdoc */
78+
override private[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = {
79+
require(entry != null, "entry cannot be null")
80+
entry.readFrom(reader)
81+
}
82+
83+
/** @inheritdoc */
84+
override private[sql] def get[T](entry: ConfigEntry[T], default: T): T = {
85+
require(entry != null, "entry cannot be null")
86+
Option(get(entry.key, null)).map(entry.valueConverter).getOrElse(default)
87+
}
88+
5389
/** @inheritdoc */
5490
def getAll: Map[String, String] = {
5591
val response = executeConfigRequest { builder =>
@@ -65,11 +101,11 @@ class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
65101

66102
/** @inheritdoc */
67103
def getOption(key: String): Option[String] = {
68-
val pair = executeConfigRequestSinglePair { builder =>
104+
val kv = executeConfigRequestSinglePair { builder =>
69105
builder.getGetOptionBuilder.addKeys(key)
70106
}
71-
if (pair.hasValue) {
72-
Option(pair.getValue)
107+
if (kv.hasValue) {
108+
Option(kv.getValue)
73109
} else {
74110
None
75111
}
@@ -84,17 +120,11 @@ class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
84120

85121
/** @inheritdoc */
86122
def isModifiable(key: String): Boolean = {
87-
val modifiable = executeConfigRequestSingleValue { builder =>
123+
val kv = executeConfigRequestSinglePair { builder =>
88124
builder.getIsModifiableBuilder.addKeys(key)
89125
}
90-
java.lang.Boolean.valueOf(modifiable)
91-
}
92-
93-
private def executeConfigRequestSingleValue(
94-
f: ConfigRequest.Operation.Builder => Unit): String = {
95-
val pair = executeConfigRequestSinglePair(f)
96-
require(pair.hasValue, "The returned pair does not have a value set")
97-
pair.getValue
126+
require(kv.hasValue, "The returned pair does not have a value set")
127+
java.lang.Boolean.valueOf(kv.getValue)
98128
}
99129

100130
private def executeConfigRequestSinglePair(
@@ -113,4 +143,6 @@ class ConnectRuntimeConfig private[sql] (client: SparkConnectClient)
113143
}
114144
response
115145
}
146+
147+
private val reader = new ConfigReader((key: String) => Option(self.get(key, null)))
116148
}

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.scalatest.PrivateMethodTester
3333

3434
import org.apache.spark.{SparkArithmeticException, SparkException, SparkUpgradeException}
3535
import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
36+
import org.apache.spark.internal.config.ConfigBuilder
3637
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, TableAlreadyExistsException, TempTableAlreadyExistsException}
3738
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
3839
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
@@ -1006,8 +1007,12 @@ class ClientE2ETestSuite
10061007
test("RuntimeConfig") {
10071008
intercept[NoSuchElementException](spark.conf.get("foo.bar"))
10081009
assert(spark.conf.getOption("foo.bar").isEmpty)
1010+
assert(spark.conf.get("foo.bar", "nope") == "nope")
1011+
assert(spark.conf.get("foo.bar", null) == null)
10091012
spark.conf.set("foo.bar", value = true)
10101013
assert(spark.conf.getOption("foo.bar") === Option("true"))
1014+
assert(spark.conf.get("foo.bar", "nope") === "true")
1015+
assert(spark.conf.get("foo.bar", null) === "true")
10111016
spark.conf.set("foo.bar.numBaz", 100L)
10121017
assert(spark.conf.get("foo.bar.numBaz") === "100")
10131018
spark.conf.set("foo.bar.name", "donkey")
@@ -1020,6 +1025,24 @@ class ClientE2ETestSuite
10201025
assert(spark.conf.isModifiable("spark.sql.ansi.enabled"))
10211026
assert(!spark.conf.isModifiable("spark.sql.globalTempDatabase"))
10221027
intercept[Exception](spark.conf.set("spark.sql.globalTempDatabase", "/dev/null"))
1028+
1029+
val entry = ConfigBuilder("my.simple.conf").intConf.createOptional
1030+
intercept[NoSuchElementException](spark.conf.get(entry.key))
1031+
assert(spark.conf.get(entry).isEmpty)
1032+
assert(spark.conf.get(entry, Option(55)) === Option(55))
1033+
spark.conf.set(entry, Option(33))
1034+
assert(spark.conf.get(entry.key) === "33")
1035+
assert(spark.conf.get(entry) === Option(33))
1036+
assert(spark.conf.get(entry, Option(55)) === Option(33))
1037+
1038+
val entryWithDefault = ConfigBuilder("my.important.conf").intConf.createWithDefault(10)
1039+
intercept[NoSuchElementException](spark.conf.get(entryWithDefault.key))
1040+
assert(spark.conf.get(entryWithDefault) === 10)
1041+
assert(spark.conf.get(entryWithDefault, 11) === 11)
1042+
spark.conf.set(entryWithDefault, 12)
1043+
assert(spark.conf.get(entryWithDefault.key) === "12")
1044+
assert(spark.conf.get(entryWithDefault) === 12)
1045+
assert(spark.conf.get(entryWithDefault, 11) === 12)
10231046
}
10241047

10251048
test("SparkVersion") {
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.internal.config
18+
19+
import java.util.{Map => JMap}
20+
21+
import org.apache.spark.SparkConf
22+
23+
/**
24+
* A config provider that only reads Spark config keys.
25+
*/
26+
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {
27+
28+
override def get(key: String): Option[String] = {
29+
if (key.startsWith("spark.")) {
30+
Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf))
31+
} else {
32+
None
33+
}
34+
}
35+
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ private[spark] object Utils
103103
with SparkErrorUtils
104104
with SparkFileUtils
105105
with SparkSerDeUtils
106-
with SparkStreamUtils {
106+
with SparkStreamUtils
107+
with SparkStringUtils {
107108

108109
private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
109110
@volatile private var cachedLocalDir: String = ""
@@ -2799,10 +2800,6 @@ private[spark] object Utils
27992800
}
28002801
}
28012802

2802-
def stringToSeq(str: String): Seq[String] = {
2803-
str.split(",").map(_.trim()).filter(_.nonEmpty).toImmutableArraySeq
2804-
}
2805-
28062803
/**
28072804
* Create instances of extension classes.
28082805
*

sql/api/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.spark.sql
1818

1919
import org.apache.spark.annotation.Stable
20+
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
2021

2122
/**
2223
* Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
@@ -53,6 +54,11 @@ abstract class RuntimeConfig {
5354
set(key, value.toString)
5455
}
5556

57+
/**
58+
* Sets the given Spark runtime configuration property.
59+
*/
60+
private[sql] def set[T](entry: ConfigEntry[T], value: T): Unit
61+
5662
/**
5763
* Returns the value of Spark runtime configuration property for the given key. If the key is
5864
* not set yet, return its default value if possible, otherwise `NoSuchElementException` will be
@@ -74,6 +80,25 @@ abstract class RuntimeConfig {
7480
*/
7581
def get(key: String, default: String): String
7682

83+
/**
84+
* Returns the value of Spark runtime configuration property for the given key. If the key is
85+
* not set yet, return `defaultValue` in [[ConfigEntry]].
86+
*/
87+
@throws[NoSuchElementException]("if the key is not set")
88+
private[sql] def get[T](entry: ConfigEntry[T]): T
89+
90+
/**
91+
* Returns the value of Spark runtime configuration property for the given key. If the key is
92+
* not set yet, return None.
93+
*/
94+
private[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T]
95+
96+
/**
97+
* Returns the value of Spark runtime configuration property for the given key. If the key is
98+
* not set yet, return the user given `default`.
99+
*/
100+
private[sql] def get[T](entry: ConfigEntry[T], default: T): T
101+
77102
/**
78103
* Returns all properties set in this conf.
79104
*

0 commit comments

Comments
 (0)