Skip to content

Commit 4b625bd

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-31926][SQL][TEST-HIVE1.2] Fix concurrency issue for ThriftCLIService to getPortNumber
### What changes were proposed in this pull request? When` org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext` called, it starts `ThriftCLIService` in the background with a new Thread, at the same time we call `ThriftCLIService.getPortNumber,` we might not get the bound port if it's configured with 0. This PR moves the TServer/HttpServer initialization code out of that new Thread. ### Why are the changes needed? Fix concurrency issue, improve test robustness. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? add new tests Closes #28751 from yaooqinn/SPARK-31926. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 02f32cf) Signed-off-by: Wenchen Fan <[email protected]>
1 parent c3e5cd2 commit 4b625bd

File tree

10 files changed

+104
-29
lines changed

10 files changed

+104
-29
lines changed

project/SparkBuild.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,8 @@ object SparkParallelTestGrouping {
479479
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
480480
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
481481
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
482-
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
482+
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInHttpSuite",
483+
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextInBinarySuite",
483484
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
484485
)
485486

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ trait SharedThriftServer extends SharedSparkSession {
3333
private var hiveServer2: HiveThriftServer2 = _
3434
private var serverPort: Int = 0
3535

36+
def mode: ServerMode.Value
37+
3638
override def beforeAll(): Unit = {
3739
super.beforeAll()
3840
// Retries up to 3 times with different port numbers if the server fails to start
@@ -53,11 +55,17 @@ trait SharedThriftServer extends SharedSparkSession {
5355
}
5456
}
5557

58+
protected def jdbcUri: String = if (mode == ServerMode.http) {
59+
s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice"
60+
} else {
61+
s"jdbc:hive2://localhost:$serverPort"
62+
}
63+
5664
protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
5765
val user = System.getProperty("user.name")
5866
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
5967
val connections =
60-
fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
68+
fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
6169
val statements = connections.map(_.createStatement())
6270

6371
try {
@@ -71,21 +79,33 @@ trait SharedThriftServer extends SharedSparkSession {
7179
private def startThriftServer(attempt: Int): Unit = {
7280
logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
7381
val sqlContext = spark.newSession().sqlContext
74-
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
82+
// Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could
83+
// randomly pick any free port to use.
7584
// It's much more robust than set a random port generated by ourselves ahead
7685
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
77-
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
78-
hiveServer2.getServices.asScala.foreach {
79-
case t: ThriftCLIService if t.getPortNumber != 0 =>
80-
serverPort = t.getPortNumber
81-
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
82-
case _ =>
83-
}
86+
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
87+
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
88+
89+
try {
90+
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
91+
hiveServer2.getServices.asScala.foreach {
92+
case t: ThriftCLIService =>
93+
serverPort = t.getPortNumber
94+
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
95+
case _ =>
96+
}
8497

85-
// Wait for thrift server to be ready to serve the query, via executing simple query
86-
// till the query succeeds. See SPARK-30345 for more details.
87-
eventually(timeout(30.seconds), interval(1.seconds)) {
88-
withJdbcStatement { _.execute("SELECT 1") }
98+
// Wait for thrift server to be ready to serve the query, via executing simple query
99+
// till the query succeeds. See SPARK-30345 for more details.
100+
eventually(timeout(30.seconds), interval(1.seconds)) {
101+
withJdbcStatement { _.execute("SELECT 1") }
102+
}
103+
} catch {
104+
case e: Exception =>
105+
logError("Error start hive server with Context ", e)
106+
if (hiveServer2 != null) {
107+
hiveServer2.stop()
108+
}
89109
}
90110
}
91111
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ import org.apache.spark.sql.types._
5454
*/
5555
class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer {
5656

57+
58+
override def mode: ServerMode.Value = ServerMode.binary
59+
5760
override protected def testFile(fileName: String): String = {
5861
val url = Thread.currentThread().getContextClassLoader.getResource(fileName)
5962
// Copy to avoid URISyntaxException during accessing the resources in `sql/core`

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
class ThriftServerWithSparkContextSuite extends SharedThriftServer {
20+
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
2121

2222
test("SPARK-29911: Uncache cached tables when session closed") {
2323
val cacheManager = spark.sharedState.cacheManager
@@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer {
4242
}
4343
}
4444
}
45+
46+
47+
class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite {
48+
override def mode: ServerMode.Value = ServerMode.binary
49+
}
50+
51+
class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite {
52+
override def mode: ServerMode.Value = ServerMode.http
53+
}

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hive.conf.HiveConf;
2929
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
3030
import org.apache.hadoop.hive.shims.ShimLoader;
31+
import org.apache.hive.service.ServiceException;
3132
import org.apache.hive.service.auth.HiveAuthFactory;
3233
import org.apache.hive.service.cli.CLIService;
3334
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@@ -45,7 +46,7 @@ public ThriftBinaryCLIService(CLIService cliService) {
4546
}
4647

4748
@Override
48-
public void run() {
49+
protected void initializeServer() {
4950
try {
5051
// Server thread pool
5152
String threadPoolName = "HiveServer2-Handler-Pool";
@@ -100,6 +101,14 @@ public void run() {
100101
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
101102
+ serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
102103
LOG.info(msg);
104+
} catch (Exception t) {
105+
throw new ServiceException("Error initializing " + getName(), t);
106+
}
107+
}
108+
109+
@Override
110+
public void run() {
111+
try {
103112
server.serve();
104113
} catch (Throwable t) {
105114
LOG.fatal(

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public synchronized void init(HiveConf hiveConf) {
175175
public synchronized void start() {
176176
super.start();
177177
if (!isStarted && !isEmbedded) {
178+
initializeServer();
178179
new Thread(this).start();
179180
isStarted = true;
180181
}
@@ -633,6 +634,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
633634
return resp;
634635
}
635636

637+
protected abstract void initializeServer();
638+
636639
@Override
637640
public abstract void run();
638641

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hive.shims.ShimLoader;
2929
import org.apache.hadoop.security.UserGroupInformation;
3030
import org.apache.hadoop.util.Shell;
31+
import org.apache.hive.service.ServiceException;
3132
import org.apache.hive.service.auth.HiveAuthFactory;
3233
import org.apache.hive.service.cli.CLIService;
3334
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
@@ -53,13 +54,8 @@ public ThriftHttpCLIService(CLIService cliService) {
5354
super(cliService, ThriftHttpCLIService.class.getSimpleName());
5455
}
5556

56-
/**
57-
* Configure Jetty to serve http requests. Example of a client connection URL:
58-
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
59-
* e.g. http://gateway:port/hive2/servlets/thrifths2/
60-
*/
6157
@Override
62-
public void run() {
58+
protected void initializeServer() {
6359
try {
6460
// Server thread pool
6561
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
@@ -150,6 +146,19 @@ public void run() {
150146
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
151147
+ maxWorkerThreads + " worker threads";
152148
LOG.info(msg);
149+
} catch (Exception t) {
150+
throw new ServiceException("Error initializing " + getName(), t);
151+
}
152+
}
153+
154+
/**
155+
* Configure Jetty to serve http requests. Example of a client connection URL:
156+
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
157+
* e.g. http://gateway:port/hive2/servlets/thrifths2/
158+
*/
159+
@Override
160+
public void run() {
161+
try {
153162
httpServer.join();
154163
} catch (Throwable t) {
155164
LOG.fatal(

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.hive.conf.HiveConf;
3030
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
3131
import org.apache.hadoop.hive.shims.ShimLoader;
32+
import org.apache.hive.service.ServiceException;
3233
import org.apache.hive.service.auth.HiveAuthFactory;
3334
import org.apache.hive.service.cli.CLIService;
3435
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@@ -46,7 +47,7 @@ public ThriftBinaryCLIService(CLIService cliService) {
4647
}
4748

4849
@Override
49-
public void run() {
50+
protected void initializeServer() {
5051
try {
5152
// Server thread pool
5253
String threadPoolName = "HiveServer2-Handler-Pool";
@@ -101,6 +102,14 @@ public void run() {
101102
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
102103
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
103104
LOG.info(msg);
105+
} catch (Exception t) {
106+
throw new ServiceException("Error initializing " + getName(), t);
107+
}
108+
}
109+
110+
@Override
111+
public void run() {
112+
try {
104113
server.serve();
105114
} catch (Throwable t) {
106115
LOG.error(

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public synchronized void init(HiveConf hiveConf) {
176176
public synchronized void start() {
177177
super.start();
178178
if (!isStarted && !isEmbedded) {
179+
initializeServer();
179180
new Thread(this).start();
180181
isStarted = true;
181182
}
@@ -670,6 +671,8 @@ public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req)
670671
return resp;
671672
}
672673

674+
protected abstract void initializeServer();
675+
673676
@Override
674677
public abstract void run();
675678

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hive.shims.ShimLoader;
2929
import org.apache.hadoop.security.UserGroupInformation;
3030
import org.apache.hadoop.util.Shell;
31+
import org.apache.hive.service.ServiceException;
3132
import org.apache.hive.service.auth.HiveAuthFactory;
3233
import org.apache.hive.service.cli.CLIService;
3334
import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -54,13 +55,8 @@ public ThriftHttpCLIService(CLIService cliService) {
5455
super(cliService, ThriftHttpCLIService.class.getSimpleName());
5556
}
5657

57-
/**
58-
* Configure Jetty to serve http requests. Example of a client connection URL:
59-
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
60-
* e.g. http://gateway:port/hive2/servlets/thrifths2/
61-
*/
6258
@Override
63-
public void run() {
59+
protected void initializeServer() {
6460
try {
6561
// Server thread pool
6662
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
@@ -151,6 +147,19 @@ public void run() {
151147
+ " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
152148
+ maxWorkerThreads + " worker threads";
153149
LOG.info(msg);
150+
} catch (Exception t) {
151+
throw new ServiceException("Error initializing " + getName(), t);
152+
}
153+
}
154+
155+
/**
156+
* Configure Jetty to serve http requests. Example of a client connection URL:
157+
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
158+
* e.g. http://gateway:port/hive2/servlets/thrifths2/
159+
*/
160+
@Override
161+
public void run() {
162+
try {
154163
httpServer.join();
155164
} catch (Throwable t) {
156165
LOG.error(

0 commit comments

Comments
 (0)