Skip to content

Commit 5329a42

Browse files
HeartSaVioRrshkv
authored andcommitted
[SPARK-32468][SS][TESTS][FOLLOWUP] Provide "default.api.timeout.ms" as well when specifying "request.timeout.ms" on replacing "default.api.timeout.ms"
### What changes were proposed in this pull request? This patch is a follow-up to fill the gap in apache#29272 which missed to also provide `default.api.timeout.ms` as well. apache#29272 unintentionally changed the behavior on Kafka side timeout which is incompatible with the test timeout. (`default.api.timeout.ms` gets default value which is 60 seconds, longer than test timeout.) ### Why are the changes needed? We realized the PR for SPARK-32468 (apache#29272) doesn't work as we expect. See apache#29272 (comment) for more details. ### Does this PR introduce _any_ user-facing change? No, as it only touches the tests. ### How was this patch tested? Will trigger builds from Jenkins or Github Action multiple time and confirm. Closes apache#29343 from HeartSaVioR/SPARK-32468-FOLLOWUP. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent a8ca706 commit 5329a42

File tree

3 files changed

+6
-0
lines changed

3 files changed

+6
-0
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
194194
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
195195
.option("kafka.metadata.max.age.ms", "1")
196196
.option("kafka.request.timeout.ms", "3000")
197+
.option("kafka.default.api.timeout.ms", "3000")
197198
.option("subscribePattern", s"$topicPrefix-.*")
198199
.option("failOnDataLoss", "false")
199200

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaM
224224
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
225225
.option("kafka.metadata.max.age.ms", "1")
226226
.option("kafka.request.timeout.ms", "3000")
227+
.option("kafka.default.api.timeout.ms", "3000")
227228
.option("subscribePattern", "failOnDataLoss.*")
228229
.option("startingOffsets", "earliest")
229230
.option("failOnDataLoss", "false")

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
364364
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
365365
.option("kafka.metadata.max.age.ms", "1")
366366
.option("kafka.request.timeout.ms", "3000")
367+
.option("kafka.default.api.timeout.ms", "3000")
367368
.option("subscribePattern", s"$topicPrefix-.*")
368369
.option("failOnDataLoss", "false")
369370

@@ -401,6 +402,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
401402
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
402403
.option("kafka.metadata.max.age.ms", "1")
403404
.option("kafka.request.timeout.ms", "3000")
405+
.option("kafka.default.api.timeout.ms", "3000")
404406
.option("startingOffsets", "earliest")
405407
.option("subscribePattern", s"$topicPrefix-.*")
406408

@@ -590,6 +592,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
590592
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
591593
.option("kafka.metadata.max.age.ms", "1")
592594
.option("kafka.request.timeout.ms", "3000")
595+
.option("kafka.default.api.timeout.ms", "3000")
593596
.option("subscribe", topic)
594597
// If a topic is deleted and we try to poll data starting from offset 0,
595598
// the Kafka consumer will just block until timeout and return an empty result.
@@ -1861,6 +1864,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
18611864
.option("subscribePattern", "stress.*")
18621865
.option("failOnDataLoss", "false")
18631866
.option("kafka.request.timeout.ms", "3000")
1867+
.option("kafka.default.api.timeout.ms", "3000")
18641868
.load()
18651869
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
18661870
.as[(String, String)]

0 commit comments

Comments
 (0)