Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ public void close() throws HiveSQLException {
opHandleSet.clear();
// Cleanup session log directory.
cleanupSessionLogDir();
// Cleanup pipeout file.
cleanupPipeoutFile();
HiveHistory hiveHist = sessionState.getHiveHistory();
if (null != hiveHist) {
hiveHist.closeStream();
Expand All @@ -665,6 +667,22 @@ public void close() throws HiveSQLException {
}
}

private void cleanupPipeoutFile() {
String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);

File[] fileAry = new File(lScratchDir).listFiles(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There is an overloaded version of listfiles.
Using it saves 1 line.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry i did not find the overloaded version of listfiles which saves 1 line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. I hope this works.

File[] fileAry = new File(lScratchDir).listFiles(
      (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));

I think it would be good if you would have one unit test for your change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros will fix it, thanks!

(dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));

for (File file : fileAry) {
try {
FileUtils.forceDelete(file);
} catch (Exception e) {
LOG.error("Failed to cleanup pipeout file: " + file, e);
}
}
}

private void cleanupSessionLogDir() {
if (isOperationLogEnabled) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.io.{File, FilenameFilter}
import java.net.URL
import java.nio.charset.StandardCharsets
import java.sql.{Date, DriverManager, SQLException, Statement}
import java.util.UUID

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -613,6 +614,28 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
bufferSrc.close()
}
}

test("SPARK-23547 Cleanup the .pipeout file when the Hive Session closed") {
def pipeoutFileList(sessionID: UUID): Array[File] = {
lScratchDir.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.startsWith(sessionID.toString) && name.endsWith(".pipeout")
}
})
}

withCLIServiceClient { client =>
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")
val sessionID = sessionHandle.getSessionId

assert(pipeoutFileList(sessionID).length == 1)

client.closeSession(sessionHandle)

assert(pipeoutFileList(sessionID).length == 0)
}
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
Expand Down Expand Up @@ -807,6 +830,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private val pidDir: File = Utils.createTempDir("thriftserver-pid")
protected var logPath: File = _
protected var operationLogPath: File = _
protected var lScratchDir: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]

Expand Down Expand Up @@ -844,6 +868,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
Expand Down Expand Up @@ -873,6 +898,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
metastorePath.delete()
operationLogPath = Utils.createTempDir()
operationLogPath.delete()
lScratchDir = Utils.createTempDir()
lScratchDir.delete()
logPath = null
logTailingProcess = null

Expand Down Expand Up @@ -956,6 +983,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
operationLogPath.delete()
operationLogPath = null

lScratchDir.delete()
lScratchDir = null

Option(logPath).foreach(_.delete())
logPath = null

Expand Down