-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31926][SQL][test-hive1.2] Fix concurrency issue for ThriftCLIService to getPortNumber #28751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0379d6e
72ac908
0beaa69
0a46508
edc5b0d
2d0403a
fb985e6
04f0a1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -480,7 +480,8 @@ object SparkParallelTestGrouping { | |
| "org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", | ||
| "org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", | ||
| "org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite", | ||
| "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite", | ||
| "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite", | ||
| "org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite", | ||
| "org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does that mean this approach to speed up the test runner never works for maven? cc @gengliangwang @wangyum
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's not related to maven I think.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree with @gengliangwang.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any way to run these test JVM-individually with maven? It seems not to be able to start 2 thrift servers with different kinds of transport modes on the shared spark session in one JVM
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just run these 2 test suites one by one?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The root cause I found so far is: in If what I‘ve found is the only issue that stops these tests from running together in a single JVM(verified locally and went well), I guess we can remove these 2 lines eventually. Sended a new PR, #28797 |
||
| ) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession { | |
| private var hiveServer2: HiveThriftServer2 = _ | ||
| private var serverPort: Int = 0 | ||
|
|
||
| def mode: ServerMode.Value | ||
|
|
||
| override def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
| // Retries up to 3 times with different port numbers if the server fails to start | ||
|
|
@@ -53,11 +55,17 @@ trait SharedThriftServer extends SharedSparkSession { | |
| } | ||
| } | ||
|
|
||
| protected def jdbcUri: String = if (mode == ServerMode.http) { | ||
| s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice" | ||
| } else { | ||
| s"jdbc:hive2://localhost:$serverPort" | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit format: ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the existing format is correct. |
||
|
|
||
| protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = { | ||
| val user = System.getProperty("user.name") | ||
| require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2") | ||
| val connections = | ||
| fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") } | ||
| fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") } | ||
| val statements = connections.map(_.createStatement()) | ||
|
|
||
| try { | ||
|
|
@@ -71,21 +79,33 @@ trait SharedThriftServer extends SharedSparkSession { | |
| private def startThriftServer(attempt: Int): Unit = { | ||
| logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt") | ||
| val sqlContext = spark.newSession().sqlContext | ||
| // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use. | ||
| // Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could | ||
| // randomly pick any free port to use. | ||
| // It's much more robust than set a random port generated by ourselves ahead | ||
| sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") | ||
| hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) | ||
| hiveServer2.getServices.asScala.foreach { | ||
| case t: ThriftCLIService if t.getPortNumber != 0 => | ||
| serverPort = t.getPortNumber | ||
| logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") | ||
| case _ => | ||
| } | ||
| sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") | ||
| sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) | ||
|
|
||
| try { | ||
| hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) | ||
| hiveServer2.getServices.asScala.foreach { | ||
| case t: ThriftCLIService => | ||
| serverPort = t.getPortNumber | ||
| logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we may not output this log?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before this fix, yes. The port binding is in another background thread.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does this patch fix it? It seems you just added a try-catch?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With https://github.com/apache/spark/pull/28751/files#diff-7610697b4f8f1bc4842c77e50807914cR178 and its implementations, the port binding is done in the same thread where we call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #28651 (comment) . there was a discussion with @juliuszsompolski before
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah I see!
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Take After:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. Nice catch. |
||
| case _ => | ||
| } | ||
|
|
||
| // Wait for thrift server to be ready to serve the query, via executing simple query | ||
| // till the query succeeds. See SPARK-30345 for more details. | ||
| eventually(timeout(30.seconds), interval(1.seconds)) { | ||
| withJdbcStatement { _.execute("SELECT 1") } | ||
| // Wait for thrift server to be ready to serve the query, via executing simple query | ||
| // till the query succeeds. See SPARK-30345 for more details. | ||
| eventually(timeout(30.seconds), interval(1.seconds)) { | ||
| withJdbcStatement { _.execute("SELECT 1") } | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logError("Error start hive server with Context ", e) | ||
| if (hiveServer2 != null) { | ||
| hiveServer2.stop() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.hive.thriftserver | ||
|
|
||
| class ThriftServerWithSparkContextSuite extends SharedThriftServer { | ||
| trait ThriftServerWithSparkContextSuite extends SharedThriftServer { | ||
|
|
||
| test("SPARK-29911: Uncache cached tables when session closed") { | ||
| val cacheManager = spark.sharedState.cacheManager | ||
|
|
@@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer { | |
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new test suite never passes. |
||
| override def mode: ServerMode.Value = ServerMode.binary | ||
| } | ||
|
|
||
| class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite { | ||
| override def mode: ServerMode.Value = ServerMode.http | ||
| } | ||

Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a SBT-only workaround, doesn't it?