Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.nio.file.FileVisitResult;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -74,6 +75,34 @@ public static void deleteQuietly(File file) {
}
}

/** Copy src to the target directory simply. File attribute times are not copied. */
public static void copyDirectory(File src, File dst) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If src is a File, it seems that a file-to-file copy will occur regardless of whether target exists, and no error will be reported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @LuciferYang . I addressed your comment.

if (src == null || dst == null || !src.exists() || !src.isDirectory() ||
(dst.exists() && !dst.isDirectory())) {
throw new IllegalArgumentException("Invalid input file " + src + " or directory " + dst);
}
Path from = src.toPath().toAbsolutePath().normalize();
Path to = dst.toPath().toAbsolutePath().normalize();
if (to.startsWith(from)) {
throw new IllegalArgumentException("Cannot copy directory to itself or its subdirectory");
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here. I addressed your comment, @LuciferYang .

Files.createDirectories(to);
Files.walkFileTree(from, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
Files.createDirectories(to.resolve(from.relativize(dir)));
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.copy(file, to.resolve(from.relativize(file)), StandardCopyOption.REPLACE_EXISTING);
return FileVisitResult.CONTINUE;
}
});
}

/** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */
public static int nonNegativeHash(Object obj) {
if (obj == null) { return 0; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ private[spark] trait SparkFileUtils extends Logging {
}.toFile
}

/** Copy src to the target directory simply. File attribute times are not copied. */
def copyDirectory(src: File, dir: File): Unit = {
JavaUtils.copyDirectory(src, dir)
}

/** Copy file to the target directory simply. File attribute times are not copied. */
def copyFileToDirectory(file: File, dir: File): Unit = {
if (file == null || dir == null || !file.exists() || (dir.exists() && !dir.isDirectory())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.io.Source
import scala.jdk.CollectionConverters._
import scala.util.Random

import org.apache.commons.io.FileUtils
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
Expand Down Expand Up @@ -1729,7 +1728,7 @@ abstract class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBa
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

testStream(query)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.File
import java.nio.charset.StandardCharsets

import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
Expand Down Expand Up @@ -173,7 +172,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
val dbBackend = DBBackend.byName(dbBackendName)
logWarning(s"Use ${dbBackend.name()} as the implementation of " +
s"${SHUFFLE_SERVICE_DB_BACKEND.key}")
FileUtils.copyDirectory(registeredExecFile, execStateCopy)
Utils.copyDirectory(registeredExecFile, execStateCopy)
assert(!ShuffleTestAccessor
.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty)
}
Expand Down
5 changes: 5 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ This file is divided into 3 sections:
<customMessage>Use copyFileToDirectory of SparkFileUtils or Utils instead.</customMessage>
</check>

<check customId="copyDirectory" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex"> FileUtils\.copyDirectory</parameter></parameters>
<customMessage>Use copyDirectory of JavaUtils/SparkFileUtils/Utils instead.</customMessage>
</check>

<check customId="commonslang2" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">org\.apache\.commons\.lang\.</parameter></parameters>
<customMessage>Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package org.apache.spark.sql.connect.client

import java.nio.file.Paths

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.connect.test.ConnectFunSuite
import org.apache.spark.util.SparkFileUtils

Expand All @@ -32,7 +30,7 @@ class ClassFinderSuite extends ConnectFunSuite {
requiredClasses.foreach(className =>
assume(classResourcePath.resolve(className).toFile.exists))
val copyDir = SparkFileUtils.createTempDir().toPath
FileUtils.copyDirectory(classResourcePath.toFile, copyDir.toFile)
SparkFileUtils.copyDirectory(classResourcePath.toFile, copyDir.toFile)
val monitor = new REPLClassDirMonitor(copyDir.toAbsolutePath.toString)

def checkClasses(monitor: REPLClassDirMonitor, additionalClasses: Seq[String] = Nil): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging
val sparkContext = session.sparkContext
val newArtifactManager = new ArtifactManager(newSession)
if (artifactPath.toFile.exists()) {
FileUtils.copyDirectory(artifactPath.toFile, newArtifactManager.artifactPath.toFile)
Utils.copyDirectory(artifactPath.toFile, newArtifactManager.artifactPath.toFile)
}
val blockManager = sparkContext.env.blockManager
val newBlockIds = cachedBlockIdList.asScala.map { blockId =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.HashSet
import scala.concurrent.duration._

import org.apache.commons.io.FileUtils

import org.apache.spark.{CleanerListener, SparkRuntimeException}
import org.apache.spark.executor.DataReadMethod._
import org.apache.spark.executor.DataReadMethod.DataReadMethod
Expand Down Expand Up @@ -1405,7 +1403,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
.filter(_.startsWith("Location:"))
.head
.replace("Location: file:", "")
FileUtils.copyDirectory(
Utils.copyDirectory(
new File(part0Loc),
new File(part0Loc.replace("part=0", "part=1")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
assume(artifactPath.resolve("smallClassFile.class").toFile.exists)

val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("smallClassFile.class")
assert(stagingPath.toFile.exists())
val remotePath = Paths.get("classes/smallClassFile.class")
Expand All @@ -72,7 +72,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
assume(artifactPath.resolve("Hello.class").toFile.exists)

val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("Hello.class")
assert(stagingPath.toFile.exists())
val remotePath = Paths.get("classes/Hello.class")
Expand All @@ -98,7 +98,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
assume(artifactPath.resolve("Hello.class").toFile.exists)

val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("Hello.class")
assert(stagingPath.toFile.exists())
val remotePath = Paths.get("classes/Hello.class")
Expand Down Expand Up @@ -180,7 +180,7 @@ class ArtifactManagerSuite extends SharedSparkSession {

val copyDir = Utils.createTempDir().toPath
val destFSDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("smallClassFile.class")
val remotePath = Paths.get("forward_to_fs", destFSDir.toString, "smallClassFileCopied.class")
assert(stagingPath.toFile.exists())
Expand All @@ -203,7 +203,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
val blockId = CacheId(spark.sessionUUID, "abc")
// Setup artifact dir
val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
try {
artifactManager.addArtifact(remotePath, stagingPath, None)
val stagingPathFile = copyDir.resolve("smallClassFile.class")
Expand Down Expand Up @@ -246,7 +246,7 @@ class ArtifactManagerSuite extends SharedSparkSession {

def addHelloClass(session: SparkSession): Unit = {
val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("Hello.class")
val remotePath = Paths.get("classes/Hello.class")
assert(stagingPath.toFile.exists())
Expand Down Expand Up @@ -299,7 +299,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
assume(artifactPath.resolve("Hello.class").toFile.exists)

val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("Hello.class")
val remotePath = Paths.get("classes/Hello.class")

Expand Down Expand Up @@ -368,7 +368,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
val copyDir = Utils.createTempDir().toPath
assume(artifactPath.resolve(classFileToUse).toFile.exists)

FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val classPath = copyDir.resolve(classFileToUse)
assert(classPath.toFile.exists())

Expand Down Expand Up @@ -407,7 +407,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
withTempPath { dir =>
val path = dir.toPath
// Setup artifact dir
FileUtils.copyDirectory(artifactPath.toFile, dir)
Utils.copyDirectory(artifactPath.toFile, dir)
val randomFilePath = path.resolve("random_file")
val testBytes = "test".getBytes(StandardCharsets.UTF_8)
Files.write(randomFilePath, testBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command

import java.io.File

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalactic.source.Position
import org.scalatest.Tag
Expand All @@ -29,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils

/**
* The common settings and utility functions for all v1 and v2 test suites. When a function
Expand Down Expand Up @@ -170,7 +170,7 @@ trait DDLCommandTestUtils extends SQLTestUtils {
def copyPartition(tableName: String, from: String, to: String): String = {
val part0Loc = getPartitionLocation(tableName, from)
val part1Loc = part0Loc.replace(from, to)
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
Utils.copyDirectory(new File(part0Loc), new File(part1Loc))
part1Loc
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}

import scala.collection.mutable.ListBuffer

import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Seconds, Span}
Expand Down Expand Up @@ -1359,10 +1358,10 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

testStream(streamEvent, extraOptions = Map(
ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming

import java.io.File

import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should._
import org.scalatest.time.{Seconds, Span}
Expand Down Expand Up @@ -169,7 +168,7 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter with Match
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

testStream(streamEvent) (
AddData(inputData, 1, 2, 3, 4, 5, 6),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ package org.apache.spark.sql.execution.streaming

import java.io.File

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

class OffsetSeqLogSuite extends SharedSparkSession {

Expand Down Expand Up @@ -180,7 +179,7 @@ class OffsetSeqLogSuite extends SharedSparkSession {
withTempDir { checkpointDir =>
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-4.0.0-tws-" + storeEncodingFormat + "/").toURI
FileUtils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
Utils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
verifyOffsetLogEntry(checkpointDir.getAbsolutePath, entryExists = true,
storeEncodingFormat)
}
Expand All @@ -191,7 +190,7 @@ class OffsetSeqLogSuite extends SharedSparkSession {
withTempDir { checkpointDir =>
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-3.5.1-streaming-deduplication/").toURI
FileUtils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
Utils.copyDirectory(new File(resourceUri), checkpointDir.getCanonicalFile)
verifyOffsetLogEntry(checkpointDir.getAbsolutePath, entryExists = false,
"unsaferow")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming.state

import java.io.File

import org.apache.commons.io.FileUtils

import org.apache.spark.SparkFunSuite
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.plans.PlanTestBase
Expand All @@ -37,7 +35,7 @@ class StateStoreCompatibilitySuite extends StreamTest with StateStoreCodecsTest
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-3.0.0-streaming-statestore-codec/").toURI
val checkpointDir = Utils.createTempDir().getCanonicalFile
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}
import java.util.concurrent.TimeUnit._

import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
Expand Down Expand Up @@ -245,7 +244,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

inputData.addData(15)
inputData.addData(10, 12, 14)
Expand Down Expand Up @@ -850,7 +849,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
Utils.copyDirectory(new File(resourceUri), checkpointDir)

input1.addData(20)
input2.addData(30)
Expand Down
Loading