Skip to content

Commit 0f78b51

Browse files
zuotingbingmstewart141
authored andcommitted
[SPARK-23547][SQL] Cleanup the .pipeout file when the Hive Session closed
## What changes were proposed in this pull request? ![2018-03-07_121010](https://user-images.githubusercontent.com/24823338/37073232-922e10d2-2200-11e8-8172-6e03aa984b39.png) when the hive session closed, we should also cleanup the .pipeout file. ## How was this patch tested? Added test cases. Author: zuotingbing <[email protected]> Closes apache#20702 from zuotingbing/SPARK-23547.
1 parent 5348c55 commit 0f78b51

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,8 @@ public void close() throws HiveSQLException {
641641
opHandleSet.clear();
642642
// Cleanup session log directory.
643643
cleanupSessionLogDir();
644+
// Cleanup pipeout file.
645+
cleanupPipeoutFile();
644646
HiveHistory hiveHist = sessionState.getHiveHistory();
645647
if (null != hiveHist) {
646648
hiveHist.closeStream();
@@ -665,6 +667,22 @@ public void close() throws HiveSQLException {
665667
}
666668
}
667669

670+
private void cleanupPipeoutFile() {
671+
String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
672+
String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
673+
674+
File[] fileAry = new File(lScratchDir).listFiles(
675+
(dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));
676+
677+
for (File file : fileAry) {
678+
try {
679+
FileUtils.forceDelete(file);
680+
} catch (Exception e) {
681+
LOG.error("Failed to cleanup pipeout file: " + file, e);
682+
}
683+
}
684+
}
685+
668686
private void cleanupSessionLogDir() {
669687
if (isOperationLogEnabled) {
670688
try {

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark.sql.hive.thriftserver
1919

20-
import java.io.File
20+
import java.io.{File, FilenameFilter}
2121
import java.net.URL
2222
import java.nio.charset.StandardCharsets
2323
import java.sql.{Date, DriverManager, SQLException, Statement}
24+
import java.util.UUID
2425

2526
import scala.collection.mutable
2627
import scala.collection.mutable.ArrayBuffer
@@ -613,6 +614,28 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
613614
bufferSrc.close()
614615
}
615616
}
617+
618+
test("SPARK-23547 Cleanup the .pipeout file when the Hive Session closed") {
619+
def pipeoutFileList(sessionID: UUID): Array[File] = {
620+
lScratchDir.listFiles(new FilenameFilter {
621+
override def accept(dir: File, name: String): Boolean = {
622+
name.startsWith(sessionID.toString) && name.endsWith(".pipeout")
623+
}
624+
})
625+
}
626+
627+
withCLIServiceClient { client =>
628+
val user = System.getProperty("user.name")
629+
val sessionHandle = client.openSession(user, "")
630+
val sessionID = sessionHandle.getSessionId
631+
632+
assert(pipeoutFileList(sessionID).length == 1)
633+
634+
client.closeSession(sessionHandle)
635+
636+
assert(pipeoutFileList(sessionID).length == 0)
637+
}
638+
}
616639
}
617640

618641
class SingleSessionSuite extends HiveThriftJdbcTest {
@@ -807,6 +830,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
807830
private val pidDir: File = Utils.createTempDir(namePrefix = "thriftserver-pid")
808831
protected var logPath: File = _
809832
protected var operationLogPath: File = _
833+
protected var lScratchDir: File = _
810834
private var logTailingProcess: Process = _
811835
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
812836

@@ -844,6 +868,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
844868
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
845869
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
846870
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
871+
| --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
847872
| --hiveconf $portConf=$port
848873
| --driver-class-path $driverClassPath
849874
| --driver-java-options -Dlog4j.debug
@@ -873,6 +898,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
873898
metastorePath.delete()
874899
operationLogPath = Utils.createTempDir()
875900
operationLogPath.delete()
901+
lScratchDir = Utils.createTempDir()
902+
lScratchDir.delete()
876903
logPath = null
877904
logTailingProcess = null
878905

@@ -956,6 +983,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
956983
operationLogPath.delete()
957984
operationLogPath = null
958985

986+
lScratchDir.delete()
987+
lScratchDir = null
988+
959989
Option(logPath).foreach(_.delete())
960990
logPath = null
961991

0 commit comments

Comments
 (0)