Skip to content

Commit 4a8c4be

Browse files
committed
[SPARK-53073][CORE][SQL][YARN][SS] Support copyDirectory in SparkFileUtils and JavaUtils
### What changes were proposed in this pull request? This PR aims to support `copyDirectory` in `SparkFileUtils` and `JavaUtils`. ### Why are the changes needed? To provide a better implementation. **BEFORE** ```scala scala> spark.time(org.apache.commons.io.FileUtils.copyDirectory(new java.io.File("/tmp/spark"), new java.io.File("/tmp/spark2"))) Time taken: 5128 ms ``` **AFTER** ```scala scala> spark.time(org.apache.spark.network.util.JavaUtils.copyDirectory(new java.io.File("/tmp/spark"), new java.io.File("/tmp/spark2"))) Time taken: 2979 ms ``` ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51786 from dongjoon-hyun/SPARK-53073. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 322b3d0 commit 4a8c4be

File tree

26 files changed

+83
-70
lines changed

26 files changed

+83
-70
lines changed

common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.nio.file.Path;
2727
import java.nio.file.FileVisitResult;
2828
import java.nio.file.SimpleFileVisitor;
29+
import java.nio.file.StandardCopyOption;
2930
import java.nio.file.attribute.BasicFileAttributes;
3031
import java.util.*;
3132
import java.util.concurrent.TimeUnit;
@@ -74,6 +75,34 @@ public static void deleteQuietly(File file) {
7475
}
7576
}
7677

78+
/** Copy src to the target directory simply. File attribute times are not copied. */
79+
public static void copyDirectory(File src, File dst) throws IOException {
80+
if (src == null || dst == null || !src.exists() || !src.isDirectory() ||
81+
(dst.exists() && !dst.isDirectory())) {
82+
throw new IllegalArgumentException("Invalid input file " + src + " or directory " + dst);
83+
}
84+
Path from = src.toPath().toAbsolutePath().normalize();
85+
Path to = dst.toPath().toAbsolutePath().normalize();
86+
if (to.startsWith(from)) {
87+
throw new IllegalArgumentException("Cannot copy directory to itself or its subdirectory");
88+
}
89+
Files.createDirectories(to);
90+
Files.walkFileTree(from, new SimpleFileVisitor<Path>() {
91+
@Override
92+
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
93+
throws IOException {
94+
Files.createDirectories(to.resolve(from.relativize(dir)));
95+
return FileVisitResult.CONTINUE;
96+
}
97+
98+
@Override
99+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
100+
Files.copy(file, to.resolve(from.relativize(file)), StandardCopyOption.REPLACE_EXISTING);
101+
return FileVisitResult.CONTINUE;
102+
}
103+
});
104+
}
105+
77106
/** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */
78107
public static int nonNegativeHash(Object obj) {
79108
if (obj == null) { return 0; }

common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ private[spark] trait SparkFileUtils extends Logging {
147147
}.toFile
148148
}
149149

150+
/** Copy src to the target directory simply. File attribute times are not copied. */
151+
def copyDirectory(src: File, dir: File): Unit = {
152+
JavaUtils.copyDirectory(src, dir)
153+
}
154+
150155
/** Copy file to the target directory simply. File attribute times are not copied. */
151156
def copyFileToDirectory(file: File, dir: File): Unit = {
152157
if (file == null || dir == null || !file.exists() || (dir.exists() && !dir.isDirectory())) {

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import scala.io.Source
2929
import scala.jdk.CollectionConverters._
3030
import scala.util.Random
3131

32-
import org.apache.commons.io.FileUtils
3332
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
3433
import org.apache.kafka.common.TopicPartition
3534
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -1729,7 +1728,7 @@ abstract class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBa
17291728
val checkpointDir = Utils.createTempDir().getCanonicalFile
17301729
// Copy the checkpoint to a temp dir to prevent changes to the original.
17311730
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
1732-
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
1731+
Utils.copyDirectory(new File(resourceUri), checkpointDir)
17331732

17341733
testStream(query)(
17351734
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.io.File
2121
import java.nio.charset.StandardCharsets
2222

2323
import com.google.common.io.Files
24-
import org.apache.commons.io.FileUtils
2524
import org.apache.hadoop.yarn.conf.YarnConfiguration
2625
import org.scalatest.matchers.must.Matchers
2726
import org.scalatest.matchers.should.Matchers._
@@ -173,7 +172,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers {
173172
val dbBackend = DBBackend.byName(dbBackendName)
174173
logWarning(s"Use ${dbBackend.name()} as the implementation of " +
175174
s"${SHUFFLE_SERVICE_DB_BACKEND.key}")
176-
FileUtils.copyDirectory(registeredExecFile, execStateCopy)
175+
Utils.copyDirectory(registeredExecFile, execStateCopy)
177176
assert(!ShuffleTestAccessor
178177
.reloadRegisteredExecutors(dbBackend, execStateCopy).isEmpty)
179178
}

scalastyle-config.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,11 @@ This file is divided into 3 sections:
322322
<customMessage>Use copyFileToDirectory of SparkFileUtils or Utils instead.</customMessage>
323323
</check>
324324

325+
<check customId="copyDirectory" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
326+
<parameters><parameter name="regex"> FileUtils\.copyDirectory</parameter></parameters>
327+
<customMessage>Use copyDirectory of JavaUtils/SparkFileUtils/Utils instead.</customMessage>
328+
</check>
329+
325330
<check customId="commonslang2" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
326331
<parameters><parameter name="regex">org\.apache\.commons\.lang\.</parameter></parameters>
327332
<customMessage>Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ClassFinderSuite.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package org.apache.spark.sql.connect.client
1818

1919
import java.nio.file.Paths
2020

21-
import org.apache.commons.io.FileUtils
22-
2321
import org.apache.spark.sql.connect.test.ConnectFunSuite
2422
import org.apache.spark.util.SparkFileUtils
2523

@@ -32,7 +30,7 @@ class ClassFinderSuite extends ConnectFunSuite {
3230
requiredClasses.foreach(className =>
3331
assume(classResourcePath.resolve(className).toFile.exists))
3432
val copyDir = SparkFileUtils.createTempDir().toPath
35-
FileUtils.copyDirectory(classResourcePath.toFile, copyDir.toFile)
33+
SparkFileUtils.copyDirectory(classResourcePath.toFile, copyDir.toFile)
3634
val monitor = new REPLClassDirMonitor(copyDir.toAbsolutePath.toString)
3735

3836
def checkClasses(monitor: REPLClassDirMonitor, additionalClasses: Seq[String] = Nil): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ class ArtifactManager(session: SparkSession) extends AutoCloseable with Logging
340340
val sparkContext = session.sparkContext
341341
val newArtifactManager = new ArtifactManager(newSession)
342342
if (artifactPath.toFile.exists()) {
343-
FileUtils.copyDirectory(artifactPath.toFile, newArtifactManager.artifactPath.toFile)
343+
Utils.copyDirectory(artifactPath.toFile, newArtifactManager.artifactPath.toFile)
344344
}
345345
val blockManager = sparkContext.env.blockManager
346346
val newBlockIds = cachedBlockIdList.asScala.map { blockId =>

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean
2525
import scala.collection.mutable.HashSet
2626
import scala.concurrent.duration._
2727

28-
import org.apache.commons.io.FileUtils
29-
3028
import org.apache.spark.{CleanerListener, SparkRuntimeException}
3129
import org.apache.spark.executor.DataReadMethod._
3230
import org.apache.spark.executor.DataReadMethod.DataReadMethod
@@ -1405,7 +1403,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
14051403
.filter(_.startsWith("Location:"))
14061404
.head
14071405
.replace("Location: file:", "")
1408-
FileUtils.copyDirectory(
1406+
Utils.copyDirectory(
14091407
new File(part0Loc),
14101408
new File(part0Loc.replace("part=0", "part=1")))
14111409

sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
5656
assume(artifactPath.resolve("smallClassFile.class").toFile.exists)
5757

5858
val copyDir = Utils.createTempDir().toPath
59-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
59+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
6060
val stagingPath = copyDir.resolve("smallClassFile.class")
6161
assert(stagingPath.toFile.exists())
6262
val remotePath = Paths.get("classes/smallClassFile.class")
@@ -72,7 +72,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
7272
assume(artifactPath.resolve("Hello.class").toFile.exists)
7373

7474
val copyDir = Utils.createTempDir().toPath
75-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
75+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
7676
val stagingPath = copyDir.resolve("Hello.class")
7777
assert(stagingPath.toFile.exists())
7878
val remotePath = Paths.get("classes/Hello.class")
@@ -98,7 +98,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
9898
assume(artifactPath.resolve("Hello.class").toFile.exists)
9999

100100
val copyDir = Utils.createTempDir().toPath
101-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
101+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
102102
val stagingPath = copyDir.resolve("Hello.class")
103103
assert(stagingPath.toFile.exists())
104104
val remotePath = Paths.get("classes/Hello.class")
@@ -180,7 +180,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
180180

181181
val copyDir = Utils.createTempDir().toPath
182182
val destFSDir = Utils.createTempDir().toPath
183-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
183+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
184184
val stagingPath = copyDir.resolve("smallClassFile.class")
185185
val remotePath = Paths.get("forward_to_fs", destFSDir.toString, "smallClassFileCopied.class")
186186
assert(stagingPath.toFile.exists())
@@ -203,7 +203,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
203203
val blockId = CacheId(spark.sessionUUID, "abc")
204204
// Setup artifact dir
205205
val copyDir = Utils.createTempDir().toPath
206-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
206+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
207207
try {
208208
artifactManager.addArtifact(remotePath, stagingPath, None)
209209
val stagingPathFile = copyDir.resolve("smallClassFile.class")
@@ -246,7 +246,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
246246

247247
def addHelloClass(session: SparkSession): Unit = {
248248
val copyDir = Utils.createTempDir().toPath
249-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
249+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
250250
val stagingPath = copyDir.resolve("Hello.class")
251251
val remotePath = Paths.get("classes/Hello.class")
252252
assert(stagingPath.toFile.exists())
@@ -299,7 +299,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
299299
assume(artifactPath.resolve("Hello.class").toFile.exists)
300300

301301
val copyDir = Utils.createTempDir().toPath
302-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
302+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
303303
val stagingPath = copyDir.resolve("Hello.class")
304304
val remotePath = Paths.get("classes/Hello.class")
305305

@@ -368,7 +368,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
368368
val copyDir = Utils.createTempDir().toPath
369369
assume(artifactPath.resolve(classFileToUse).toFile.exists)
370370

371-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
371+
Utils.copyDirectory(artifactPath.toFile, copyDir.toFile)
372372
val classPath = copyDir.resolve(classFileToUse)
373373
assert(classPath.toFile.exists())
374374

@@ -407,7 +407,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
407407
withTempPath { dir =>
408408
val path = dir.toPath
409409
// Setup artifact dir
410-
FileUtils.copyDirectory(artifactPath.toFile, dir)
410+
Utils.copyDirectory(artifactPath.toFile, dir)
411411
val randomFilePath = path.resolve("random_file")
412412
val testBytes = "test".getBytes(StandardCharsets.UTF_8)
413413
Files.write(randomFilePath, testBytes)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.io.File
2121

22-
import org.apache.commons.io.FileUtils
2322
import org.apache.hadoop.fs.{FileSystem, Path}
2423
import org.scalactic.source.Position
2524
import org.scalatest.Tag
@@ -29,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2928
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
3029
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3130
import org.apache.spark.sql.test.SQLTestUtils
31+
import org.apache.spark.util.Utils
3232

3333
/**
3434
* The common settings and utility functions for all v1 and v2 test suites. When a function
@@ -170,7 +170,7 @@ trait DDLCommandTestUtils extends SQLTestUtils {
170170
def copyPartition(tableName: String, from: String, to: String): String = {
171171
val part0Loc = getPartitionLocation(tableName, from)
172172
val part1Loc = part0Loc.replace(from, to)
173-
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
173+
Utils.copyDirectory(new File(part0Loc), new File(part1Loc))
174174
part1Loc
175175
}
176176

0 commit comments

Comments
 (0)