Skip to content

Commit bc6ed59

Browse files
committed
Merge SPARK-27159 fix into SPARK-27168
2 parents ab474a4 + 5564fe5 commit bc6ed59

File tree

95 files changed

+1429
-554
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+1429
-554
lines changed

R/pkg/R/DataFrame.R

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,8 +2520,9 @@ setMethod("dropDuplicates",
25202520
#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
25212521
#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
25222522
#' @param joinType The type of join to perform, default 'inner'.
2523-
#' Must be one of: 'inner', 'cross', 'outer', 'full', 'full_outer',
2524-
#' 'left', 'left_outer', 'right', 'right_outer', 'left_semi', or 'left_anti'.
2523+
#' Must be one of: 'inner', 'cross', 'outer', 'full', 'fullouter', 'full_outer',
2524+
#' 'left', 'leftouter', 'left_outer', 'right', 'rightouter', 'right_outer', 'semi',
2525+
#' 'leftsemi', 'left_semi', 'anti', 'leftanti', 'left_anti'.
25252526
#' @return A SparkDataFrame containing the result of the join operation.
25262527
#' @family SparkDataFrame functions
25272528
#' @aliases join,SparkDataFrame,SparkDataFrame-method
@@ -2553,14 +2554,14 @@ setMethod("join",
25532554
"outer", "full", "fullouter", "full_outer",
25542555
"left", "leftouter", "left_outer",
25552556
"right", "rightouter", "right_outer",
2556-
"left_semi", "leftsemi", "left_anti", "leftanti")) {
2557+
"semi", "left_semi", "leftsemi", "anti", "left_anti", "leftanti")) {
25572558
joinType <- gsub("_", "", joinType)
25582559
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
25592560
} else {
2560-
stop("joinType must be one of the following types: ",
2561-
"'inner', 'cross', 'outer', 'full', 'full_outer',",
2562-
"'left', 'left_outer', 'right', 'right_outer',",
2563-
"'left_semi', or 'left_anti'.")
2561+
stop(paste("joinType must be one of the following types:",
2562+
"'inner', 'cross', 'outer', 'full', 'fullouter', 'full_outer',",
2563+
"'left', 'leftouter', 'left_outer', 'right', 'rightouter', 'right_outer',",
2564+
"'semi', 'leftsemi', 'left_semi', 'anti', 'leftanti' or 'left_anti'."))
25642565
}
25652566
}
25662567
}

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 80 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2356,40 +2356,95 @@ test_that("join(), crossJoin() and merge() on a DataFrame", {
23562356
expect_equal(names(joined2), c("age", "name", "name", "test"))
23572357
expect_equal(count(joined2), 3)
23582358

2359-
joined3 <- join(df, df2, df$name == df2$name, "rightouter")
2359+
joined3 <- join(df, df2, df$name == df2$name, "right")
23602360
expect_equal(names(joined3), c("age", "name", "name", "test"))
23612361
expect_equal(count(joined3), 4)
23622362
expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2]))
2363-
2364-
joined4 <- select(join(df, df2, df$name == df2$name, "outer"),
2365-
alias(df$age + 5, "newAge"), df$name, df2$test)
2366-
expect_equal(names(joined4), c("newAge", "name", "test"))
2363+
2364+
joined4 <- join(df, df2, df$name == df2$name, "right_outer")
2365+
expect_equal(names(joined4), c("age", "name", "name", "test"))
23672366
expect_equal(count(joined4), 4)
2368-
expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24)
2367+
expect_true(is.na(collect(orderBy(joined4, joined4$age))$age[2]))
23692368

2370-
joined5 <- join(df, df2, df$name == df2$name, "leftouter")
2369+
joined5 <- join(df, df2, df$name == df2$name, "rightouter")
23712370
expect_equal(names(joined5), c("age", "name", "name", "test"))
2372-
expect_equal(count(joined5), 3)
2373-
expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1]))
2374-
2375-
joined6 <- join(df, df2, df$name == df2$name, "inner")
2376-
expect_equal(names(joined6), c("age", "name", "name", "test"))
2377-
expect_equal(count(joined6), 3)
2371+
expect_equal(count(joined5), 4)
2372+
expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[2]))
23782373

2379-
joined7 <- join(df, df2, df$name == df2$name, "leftsemi")
2380-
expect_equal(names(joined7), c("age", "name"))
2381-
expect_equal(count(joined7), 3)
23822374

2383-
joined8 <- join(df, df2, df$name == df2$name, "left_outer")
2384-
expect_equal(names(joined8), c("age", "name", "name", "test"))
2385-
expect_equal(count(joined8), 3)
2386-
expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1]))
2387-
2388-
joined9 <- join(df, df2, df$name == df2$name, "right_outer")
2389-
expect_equal(names(joined9), c("age", "name", "name", "test"))
2375+
joined6 <- select(join(df, df2, df$name == df2$name, "outer"),
2376+
alias(df$age + 5, "newAge"), df$name, df2$test)
2377+
expect_equal(names(joined6), c("newAge", "name", "test"))
2378+
expect_equal(count(joined6), 4)
2379+
expect_equal(collect(orderBy(joined6, joined6$name))$newAge[3], 24)
2380+
2381+
joined7 <- select(join(df, df2, df$name == df2$name, "full"),
2382+
alias(df$age + 5, "newAge"), df$name, df2$test)
2383+
expect_equal(names(joined7), c("newAge", "name", "test"))
2384+
expect_equal(count(joined7), 4)
2385+
expect_equal(collect(orderBy(joined7, joined7$name))$newAge[3], 24)
2386+
2387+
joined8 <- select(join(df, df2, df$name == df2$name, "fullouter"),
2388+
alias(df$age + 5, "newAge"), df$name, df2$test)
2389+
expect_equal(names(joined8), c("newAge", "name", "test"))
2390+
expect_equal(count(joined8), 4)
2391+
expect_equal(collect(orderBy(joined8, joined8$name))$newAge[3], 24)
2392+
2393+
joined9 <- select(join(df, df2, df$name == df2$name, "full_outer"),
2394+
alias(df$age + 5, "newAge"), df$name, df2$test)
2395+
expect_equal(names(joined9), c("newAge", "name", "test"))
23902396
expect_equal(count(joined9), 4)
2391-
expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2]))
2392-
2397+
expect_equal(collect(orderBy(joined9, joined9$name))$newAge[3], 24)
2398+
2399+
joined10 <- join(df, df2, df$name == df2$name, "left")
2400+
expect_equal(names(joined10), c("age", "name", "name", "test"))
2401+
expect_equal(count(joined10), 3)
2402+
expect_true(is.na(collect(orderBy(joined10, joined10$age))$age[1]))
2403+
2404+
joined11 <- join(df, df2, df$name == df2$name, "leftouter")
2405+
expect_equal(names(joined11), c("age", "name", "name", "test"))
2406+
expect_equal(count(joined11), 3)
2407+
expect_true(is.na(collect(orderBy(joined11, joined11$age))$age[1]))
2408+
2409+
joined12 <- join(df, df2, df$name == df2$name, "left_outer")
2410+
expect_equal(names(joined12), c("age", "name", "name", "test"))
2411+
expect_equal(count(joined12), 3)
2412+
expect_true(is.na(collect(orderBy(joined12, joined12$age))$age[1]))
2413+
2414+
joined13 <- join(df, df2, df$name == df2$name, "inner")
2415+
expect_equal(names(joined13), c("age", "name", "name", "test"))
2416+
expect_equal(count(joined13), 3)
2417+
2418+
joined14 <- join(df, df2, df$name == df2$name, "semi")
2419+
expect_equal(names(joined14), c("age", "name"))
2420+
expect_equal(count(joined14), 3)
2421+
2422+
joined14 <- join(df, df2, df$name == df2$name, "leftsemi")
2423+
expect_equal(names(joined14), c("age", "name"))
2424+
expect_equal(count(joined14), 3)
2425+
2426+
joined15 <- join(df, df2, df$name == df2$name, "left_semi")
2427+
expect_equal(names(joined15), c("age", "name"))
2428+
expect_equal(count(joined15), 3)
2429+
2430+
joined16 <- join(df2, df, df2$name == df$name, "anti")
2431+
expect_equal(names(joined16), c("name", "test"))
2432+
expect_equal(count(joined16), 1)
2433+
2434+
joined17 <- join(df2, df, df2$name == df$name, "leftanti")
2435+
expect_equal(names(joined17), c("name", "test"))
2436+
expect_equal(count(joined17), 1)
2437+
2438+
joined18 <- join(df2, df, df2$name == df$name, "left_anti")
2439+
expect_equal(names(joined18), c("name", "test"))
2440+
expect_equal(count(joined18), 1)
2441+
2442+
error_msg <- paste("joinType must be one of the following types:",
2443+
"'inner', 'cross', 'outer', 'full', 'fullouter', 'full_outer',",
2444+
"'left', 'leftouter', 'left_outer', 'right', 'rightouter', 'right_outer',",
2445+
"'semi', 'leftsemi', 'left_semi', 'anti', 'leftanti' or 'left_anti'.")
2446+
expect_error(join(df2, df, df2$name == df$name, "invalid"), error_msg)
2447+
23932448
merged <- merge(df, df2, by.x = "name", by.y = "name", all.x = TRUE, all.y = TRUE)
23942449
expect_equal(count(merged), 4)
23952450
expect_equal(names(merged), c("age", "name_x", "name_y", "test"))
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
================================================================================================
2+
Coalesced RDD , large scale
3+
================================================================================================
4+
5+
Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0
6+
Intel64 Family 6 Model 63 Stepping 2, GenuineIntel
7+
Coalesced RDD: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
8+
------------------------------------------------------------------------------------------------------------------------
9+
Coalesce Num Partitions: 100 Num Hosts: 1 346 364 24 0.3 3458.9 1.0X
10+
Coalesce Num Partitions: 100 Num Hosts: 5 258 264 6 0.4 2579.0 1.3X
11+
Coalesce Num Partitions: 100 Num Hosts: 10 242 249 7 0.4 2415.2 1.4X
12+
Coalesce Num Partitions: 100 Num Hosts: 20 237 242 7 0.4 2371.7 1.5X
13+
Coalesce Num Partitions: 100 Num Hosts: 40 230 231 1 0.4 2299.8 1.5X
14+
Coalesce Num Partitions: 100 Num Hosts: 80 222 233 14 0.4 2223.0 1.6X
15+
Coalesce Num Partitions: 500 Num Hosts: 1 659 665 5 0.2 6590.4 0.5X
16+
Coalesce Num Partitions: 500 Num Hosts: 5 340 381 47 0.3 3395.2 1.0X
17+
Coalesce Num Partitions: 500 Num Hosts: 10 279 307 47 0.4 2788.3 1.2X
18+
Coalesce Num Partitions: 500 Num Hosts: 20 259 261 2 0.4 2591.9 1.3X
19+
Coalesce Num Partitions: 500 Num Hosts: 40 241 250 15 0.4 2406.5 1.4X
20+
Coalesce Num Partitions: 500 Num Hosts: 80 235 237 3 0.4 2349.9 1.5X
21+
Coalesce Num Partitions: 1000 Num Hosts: 1 1050 1053 4 0.1 10503.2 0.3X
22+
Coalesce Num Partitions: 1000 Num Hosts: 5 405 407 2 0.2 4049.5 0.9X
23+
Coalesce Num Partitions: 1000 Num Hosts: 10 320 322 2 0.3 3202.7 1.1X
24+
Coalesce Num Partitions: 1000 Num Hosts: 20 276 277 0 0.4 2762.3 1.3X
25+
Coalesce Num Partitions: 1000 Num Hosts: 40 257 260 5 0.4 2571.2 1.3X
26+
Coalesce Num Partitions: 1000 Num Hosts: 80 245 252 13 0.4 2448.9 1.4X
27+
Coalesce Num Partitions: 5000 Num Hosts: 1 3099 3145 55 0.0 30988.6 0.1X
28+
Coalesce Num Partitions: 5000 Num Hosts: 5 1037 1050 20 0.1 10374.4 0.3X
29+
Coalesce Num Partitions: 5000 Num Hosts: 10 626 633 8 0.2 6261.8 0.6X
30+
Coalesce Num Partitions: 5000 Num Hosts: 20 426 431 5 0.2 4258.6 0.8X
31+
Coalesce Num Partitions: 5000 Num Hosts: 40 328 341 22 0.3 3275.4 1.1X
32+
Coalesce Num Partitions: 5000 Num Hosts: 80 272 275 4 0.4 2721.4 1.3X
33+
Coalesce Num Partitions: 10000 Num Hosts: 1 5516 5526 9 0.0 55156.8 0.1X
34+
Coalesce Num Partitions: 10000 Num Hosts: 5 1956 1992 48 0.1 19560.9 0.2X
35+
Coalesce Num Partitions: 10000 Num Hosts: 10 1045 1057 18 0.1 10447.4 0.3X
36+
Coalesce Num Partitions: 10000 Num Hosts: 20 637 658 24 0.2 6373.2 0.5X
37+
Coalesce Num Partitions: 10000 Num Hosts: 40 431 448 15 0.2 4312.9 0.8X
38+
Coalesce Num Partitions: 10000 Num Hosts: 80 326 328 2 0.3 3263.4 1.1X
39+
40+

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -450,27 +450,32 @@ private[deploy] class Worker(
450450
// rpcEndpoint.
451451
// Copy ids so that it can be used in the cleanup thread.
452452
val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet
453-
val cleanupFuture = concurrent.Future {
454-
val appDirs = workDir.listFiles()
455-
if (appDirs == null) {
456-
throw new IOException("ERROR: Failed to list files in " + appDirs)
457-
}
458-
appDirs.filter { dir =>
459-
// the directory is used by an application - check that the application is not running
460-
// when cleaning up
461-
val appIdFromDir = dir.getName
462-
val isAppStillRunning = appIds.contains(appIdFromDir)
463-
dir.isDirectory && !isAppStillRunning &&
464-
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
465-
}.foreach { dir =>
466-
logInfo(s"Removing directory: ${dir.getPath}")
467-
Utils.deleteRecursively(dir)
468-
}
469-
}(cleanupThreadExecutor)
453+
try {
454+
val cleanupFuture: concurrent.Future[Unit] = concurrent.Future {
455+
val appDirs = workDir.listFiles()
456+
if (appDirs == null) {
457+
throw new IOException("ERROR: Failed to list files in " + appDirs)
458+
}
459+
appDirs.filter { dir =>
460+
// the directory is used by an application - check that the application is not running
461+
// when cleaning up
462+
val appIdFromDir = dir.getName
463+
val isAppStillRunning = appIds.contains(appIdFromDir)
464+
dir.isDirectory && !isAppStillRunning &&
465+
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
466+
}.foreach { dir =>
467+
logInfo(s"Removing directory: ${dir.getPath}")
468+
Utils.deleteRecursively(dir)
469+
}
470+
}(cleanupThreadExecutor)
470471

471-
cleanupFuture.failed.foreach(e =>
472-
logError("App dir cleanup failed: " + e.getMessage, e)
473-
)(cleanupThreadExecutor)
472+
cleanupFuture.failed.foreach(e =>
473+
logError("App dir cleanup failed: " + e.getMessage, e)
474+
)(cleanupThreadExecutor)
475+
} catch {
476+
case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
477+
logWarning("Failed to cleanup work dir as executor pool was shutdown")
478+
}
474479

475480
case MasterChanged(masterRef, masterWebUiUrl) =>
476481
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@@ -634,15 +639,20 @@ private[deploy] class Worker(
634639
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
635640
if (shouldCleanup) {
636641
finishedApps -= id
637-
appDirectories.remove(id).foreach { dirList =>
638-
concurrent.Future {
639-
logInfo(s"Cleaning up local directories for application $id")
640-
dirList.foreach { dir =>
641-
Utils.deleteRecursively(new File(dir))
642-
}
643-
}(cleanupThreadExecutor).failed.foreach(e =>
644-
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
645-
)(cleanupThreadExecutor)
642+
try {
643+
appDirectories.remove(id).foreach { dirList =>
644+
concurrent.Future {
645+
logInfo(s"Cleaning up local directories for application $id")
646+
dirList.foreach { dir =>
647+
Utils.deleteRecursively(new File(dir))
648+
}
649+
}(cleanupThreadExecutor).failed.foreach(e =>
650+
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
651+
)(cleanupThreadExecutor)
652+
}
653+
} catch {
654+
case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown =>
655+
logWarning("Failed to cleanup application as executor pool was shutdown")
646656
}
647657
shuffleService.applicationRemoved(id)
648658
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ private[spark] class Executor(
175175

176176
/**
177177
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
178-
* times, it should kill itself. The default value is 60. It means we will retry to send
179-
* heartbeats about 10 minutes because the heartbeat interval is 10s.
178+
* times, it should kill itself. The default value is 60. For example, if max failures is 60 and
179+
* heartbeat interval is 10s, then it will try to send heartbeats for up to 600s (10 minutes).
180180
*/
181181
private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES)
182182

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.config
19+
20+
import java.util.concurrent.TimeUnit
21+
22+
private[spark] object Streaming {
23+
24+
private[spark] val STREAMING_DYN_ALLOCATION_ENABLED =
25+
ConfigBuilder("spark.streaming.dynamicAllocation.enabled")
26+
.booleanConf
27+
.createWithDefault(false)
28+
29+
private[spark] val STREAMING_DYN_ALLOCATION_TESTING =
30+
ConfigBuilder("spark.streaming.dynamicAllocation.testing")
31+
.booleanConf
32+
.createWithDefault(false)
33+
34+
private[spark] val STREAMING_DYN_ALLOCATION_MIN_EXECUTORS =
35+
ConfigBuilder("spark.streaming.dynamicAllocation.minExecutors")
36+
.intConf
37+
.checkValue(_ > 0, "The min executor number of streaming dynamic " +
38+
"allocation must be positive.")
39+
.createOptional
40+
41+
private[spark] val STREAMING_DYN_ALLOCATION_MAX_EXECUTORS =
42+
ConfigBuilder("spark.streaming.dynamicAllocation.maxExecutors")
43+
.intConf
44+
.checkValue(_ > 0, "The max executor number of streaming dynamic " +
45+
"allocation must be positive.")
46+
.createWithDefault(Int.MaxValue)
47+
48+
private[spark] val STREAMING_DYN_ALLOCATION_SCALING_INTERVAL =
49+
ConfigBuilder("spark.streaming.dynamicAllocation.scalingInterval")
50+
.timeConf(TimeUnit.SECONDS)
51+
.checkValue(_ > 0, "The scaling interval of streaming dynamic " +
52+
"allocation must be positive.")
53+
.createWithDefault(60)
54+
55+
private[spark] val STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO =
56+
ConfigBuilder("spark.streaming.dynamicAllocation.scalingUpRatio")
57+
.doubleConf
58+
.checkValue(_ > 0, "The scaling up ratio of streaming dynamic " +
59+
"allocation must be positive.")
60+
.createWithDefault(0.9)
61+
62+
private[spark] val STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO =
63+
ConfigBuilder("spark.streaming.dynamicAllocation.scalingDownRatio")
64+
.doubleConf
65+
.checkValue(_ > 0, "The scaling down ratio of streaming dynamic " +
66+
"allocation must be positive.")
67+
.createWithDefault(0.3)
68+
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ package object config {
301301
.doc("Number of subdirectories inside each path listed in spark.local.dir for " +
302302
"hashing Block files into.")
303303
.intConf
304+
.checkValue(_ > 0, "The number of subdirectories must be positive.")
304305
.createWithDefault(64)
305306

306307
private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH =

0 commit comments

Comments
 (0)