Skip to content

Commit a0187cd

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue for ThriftCLIService to getPortNumber
### What changes were proposed in this pull request? This PR brings 02f32cf back which reverted by 4a25200 because of maven test failure diffs newly made: 1. add a missing log4j file to test resources 2. Call `SessionState.detachSession()` to clean the thread local one in `afterAll`. 3. Not use dedicated JVMs for sbt test runner too ### Why are the changes needed? fix the maven test ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add new tests Closes #28797 from yaooqinn/SPARK-31926-NEW. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 8282bbf commit a0187cd

File tree

11 files changed

+170
-30
lines changed

11 files changed

+170
-30
lines changed

project/SparkBuild.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,6 @@ object SparkParallelTestGrouping {
480480
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
481481
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
482482
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
483-
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
484483
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
485484
)
486485

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# Set everything to be logged to the file hive-thriftserver/target/unit-tests.log
19+
log4j.rootLogger=DEBUG, CA, FA
20+
21+
#Console Appender
22+
log4j.appender.CA=org.apache.log4j.ConsoleAppender
23+
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
24+
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
25+
log4j.appender.CA.Threshold = WARN
26+
27+
28+
#File Appender
29+
log4j.appender.FA=org.apache.log4j.FileAppender
30+
log4j.appender.FA.append=false
31+
log4j.appender.FA.file=target/unit-tests.log
32+
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
33+
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
34+
35+
# Set the logger level of File Appender to WARN
36+
log4j.appender.FA.Threshold = DEBUG
37+
38+
# Some packages are noisy for no good reason.
39+
log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
40+
log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
41+
42+
log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
43+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
44+
45+
log4j.additivity.hive.log=false
46+
log4j.logger.hive.log=OFF
47+
48+
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
49+
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
50+
51+
log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
52+
log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF
53+
54+
log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
55+
log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF
56+
57+
log4j.additivity.hive.ql.metadata.Hive=false
58+
log4j.logger.hive.ql.metadata.Hive=OFF
59+
60+
log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
61+
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
62+
63+
# Parquet related logging
64+
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
65+
log4j.logger.parquet.CorruptStatistics=ERROR

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.concurrent.duration._
2424
import scala.util.Try
2525

2626
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
27+
import org.apache.hadoop.hive.ql.session.SessionState
2728
import org.apache.hive.service.cli.thrift.ThriftCLIService
2829

2930
import org.apache.spark.sql.test.SharedSparkSession
@@ -33,6 +34,8 @@ trait SharedThriftServer extends SharedSparkSession {
3334
private var hiveServer2: HiveThriftServer2 = _
3435
private var serverPort: Int = 0
3536

37+
def mode: ServerMode.Value
38+
3639
override def beforeAll(): Unit = {
3740
super.beforeAll()
3841
// Retries up to 3 times with different port numbers if the server fails to start
@@ -50,14 +53,21 @@ trait SharedThriftServer extends SharedSparkSession {
5053
hiveServer2.stop()
5154
} finally {
5255
super.afterAll()
56+
SessionState.detachSession()
5357
}
5458
}
5559

60+
protected def jdbcUri: String = if (mode == ServerMode.http) {
61+
s"jdbc:hive2://localhost:$serverPort/default;transportMode=http;httpPath=cliservice"
62+
} else {
63+
s"jdbc:hive2://localhost:$serverPort/"
64+
}
65+
5666
protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
5767
val user = System.getProperty("user.name")
5868
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
5969
val connections =
60-
fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
70+
fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
6171
val statements = connections.map(_.createStatement())
6272

6373
try {
@@ -69,23 +79,35 @@ trait SharedThriftServer extends SharedSparkSession {
6979
}
7080

7181
private def startThriftServer(attempt: Int): Unit = {
72-
logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
82+
logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt")
7383
val sqlContext = spark.newSession().sqlContext
74-
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
84+
// Set the HIVE_SERVER2_THRIFT_PORT and HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could
85+
// randomly pick any free port to use.
7586
// It's much more robust than set a random port generated by ourselves ahead
7687
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
77-
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
78-
hiveServer2.getServices.asScala.foreach {
79-
case t: ThriftCLIService if t.getPortNumber != 0 =>
80-
serverPort = t.getPortNumber
81-
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
82-
case _ =>
83-
}
88+
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
89+
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
90+
91+
try {
92+
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
93+
hiveServer2.getServices.asScala.foreach {
94+
case t: ThriftCLIService =>
95+
serverPort = t.getPortNumber
96+
logInfo(s"Started HiveThriftServer2: mode=$mode, port=$serverPort, attempt=$attempt")
97+
case _ =>
98+
}
8499

85-
// Wait for thrift server to be ready to serve the query, via executing simple query
86-
// till the query succeeds. See SPARK-30345 for more details.
87-
eventually(timeout(30.seconds), interval(1.seconds)) {
88-
withJdbcStatement { _.execute("SELECT 1") }
100+
// Wait for thrift server to be ready to serve the query, via executing simple query
101+
// till the query succeeds. See SPARK-30345 for more details.
102+
eventually(timeout(30.seconds), interval(1.seconds)) {
103+
withJdbcStatement { _.execute("SELECT 1") }
104+
}
105+
} catch {
106+
case e: Exception =>
107+
logError("Error start hive server with Context ", e)
108+
if (hiveServer2 != null) {
109+
hiveServer2.stop()
110+
}
89111
}
90112
}
91113
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ import org.apache.spark.sql.types._
5454
*/
5555
class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServer {
5656

57+
58+
override def mode: ServerMode.Value = ServerMode.binary
59+
5760
override protected def testFile(fileName: String): String = {
5861
val url = Thread.currentThread().getContextClassLoader.getResource(fileName)
5962
// Copy to avoid URISyntaxException during accessing the resources in `sql/core`

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
class ThriftServerWithSparkContextSuite extends SharedThriftServer {
20+
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
2121

2222
test("SPARK-29911: Uncache cached tables when session closed") {
2323
val cacheManager = spark.sharedState.cacheManager
@@ -42,3 +42,12 @@ class ThriftServerWithSparkContextSuite extends SharedThriftServer {
4242
}
4343
}
4444
}
45+
46+
47+
class ThriftServerWithSparkContextInBinarySuite extends ThriftServerWithSparkContextSuite {
48+
override def mode: ServerMode.Value = ServerMode.binary
49+
}
50+
51+
class ThriftServerWithSparkContextInHttpSuite extends ThriftServerWithSparkContextSuite {
52+
override def mode: ServerMode.Value = ServerMode.http
53+
}

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hive.conf.HiveConf;
2929
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
3030
import org.apache.hadoop.hive.shims.ShimLoader;
31+
import org.apache.hive.service.ServiceException;
3132
import org.apache.hive.service.auth.HiveAuthFactory;
3233
import org.apache.hive.service.cli.CLIService;
3334
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@@ -45,7 +46,7 @@ public ThriftBinaryCLIService(CLIService cliService) {
4546
}
4647

4748
@Override
48-
public void run() {
49+
protected void initializeServer() {
4950
try {
5051
// Server thread pool
5152
String threadPoolName = "HiveServer2-Handler-Pool";
@@ -100,6 +101,14 @@ public void run() {
100101
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
101102
+ serverSocket.getServerSocket().getLocalPort() + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
102103
LOG.info(msg);
104+
} catch (Exception t) {
105+
throw new ServiceException("Error initializing " + getName(), t);
106+
}
107+
}
108+
109+
@Override
110+
public void run() {
111+
try {
103112
server.serve();
104113
} catch (Throwable t) {
105114
LOG.fatal(

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public synchronized void init(HiveConf hiveConf) {
175175
public synchronized void start() {
176176
super.start();
177177
if (!isStarted && !isEmbedded) {
178+
initializeServer();
178179
new Thread(this).start();
179180
isStarted = true;
180181
}
@@ -633,6 +634,8 @@ public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException {
633634
return resp;
634635
}
635636

637+
protected abstract void initializeServer();
638+
636639
@Override
637640
public abstract void run();
638641

sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.hive.shims.ShimLoader;
2929
import org.apache.hadoop.security.UserGroupInformation;
3030
import org.apache.hadoop.util.Shell;
31+
import org.apache.hive.service.ServiceException;
3132
import org.apache.hive.service.auth.HiveAuthFactory;
3233
import org.apache.hive.service.cli.CLIService;
3334
import org.apache.hive.service.cli.thrift.TCLIService.Iface;
@@ -53,13 +54,8 @@ public ThriftHttpCLIService(CLIService cliService) {
5354
super(cliService, ThriftHttpCLIService.class.getSimpleName());
5455
}
5556

56-
/**
57-
* Configure Jetty to serve http requests. Example of a client connection URL:
58-
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
59-
* e.g. http://gateway:port/hive2/servlets/thrifths2/
60-
*/
6157
@Override
62-
public void run() {
58+
protected void initializeServer() {
6359
try {
6460
// Server thread pool
6561
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
@@ -150,6 +146,19 @@ public void run() {
150146
+ " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
151147
+ maxWorkerThreads + " worker threads";
152148
LOG.info(msg);
149+
} catch (Exception t) {
150+
throw new ServiceException("Error initializing " + getName(), t);
151+
}
152+
}
153+
154+
/**
155+
* Configure Jetty to serve http requests. Example of a client connection URL:
156+
* http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
157+
* e.g. http://gateway:port/hive2/servlets/thrifths2/
158+
*/
159+
@Override
160+
public void run() {
161+
try {
153162
httpServer.join();
154163
} catch (Throwable t) {
155164
LOG.fatal(

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.hive.conf.HiveConf;
3030
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
3131
import org.apache.hadoop.hive.shims.ShimLoader;
32+
import org.apache.hive.service.ServiceException;
3233
import org.apache.hive.service.auth.HiveAuthFactory;
3334
import org.apache.hive.service.cli.CLIService;
3435
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
@@ -46,7 +47,7 @@ public ThriftBinaryCLIService(CLIService cliService) {
4647
}
4748

4849
@Override
49-
public void run() {
50+
protected void initializeServer() {
5051
try {
5152
// Server thread pool
5253
String threadPoolName = "HiveServer2-Handler-Pool";
@@ -101,6 +102,14 @@ public void run() {
101102
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
102103
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
103104
LOG.info(msg);
105+
} catch (Exception t) {
106+
throw new ServiceException("Error initializing " + getName(), t);
107+
}
108+
}
109+
110+
@Override
111+
public void run() {
112+
try {
104113
server.serve();
105114
} catch (Throwable t) {
106115
LOG.error(

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ public synchronized void init(HiveConf hiveConf) {
176176
public synchronized void start() {
177177
super.start();
178178
if (!isStarted && !isEmbedded) {
179+
initializeServer();
179180
new Thread(this).start();
180181
isStarted = true;
181182
}
@@ -670,6 +671,8 @@ public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq req)
670671
return resp;
671672
}
672673

674+
protected abstract void initializeServer();
675+
673676
@Override
674677
public abstract void run();
675678

0 commit comments

Comments
 (0)