Skip to content

Commit c64d27e

Browse files
committed
Merge branch 'master' into SPARK-31527
2 parents c1964a8 + 3e83ccc commit c64d27e

File tree

55 files changed

+1790
-421
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1790
-421
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ package object config {
550550
"anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " +
551551
"documentation for more details.")
552552
.internal()
553-
.version("3.0.0")
553+
.version("3.1.0")
554554
.booleanConf
555555
.createWithDefault(false)
556556

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.nio.ByteBuffer
21-
import java.util.{Locale, Timer, TimerTask}
21+
import java.util.{Timer, TimerTask}
2222
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

@@ -58,6 +58,11 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Ut
5858
* scheduling
5959
* * task-result-getter threads
6060
*
61+
* CAUTION: Any non fatal exception thrown within Spark RPC framework can be swallowed.
62+
* Thus, throwing exception in methods like resourceOffers, statusUpdate won't fail
63+
* the application, but could lead to undefined behavior. Instead, we shall use method like
64+
* TaskSetManger.abort() to abort a stage and then fail the application (SPARK-31485).
65+
*
6166
* Delay Scheduling:
6267
* Delay scheduling is an optimization that sacrifices job fairness for data locality in order to
6368
* improve cluster and workload throughput. One useful definition of "delay" is how much time
@@ -356,7 +361,7 @@ private[spark] class TaskSchedulerImpl(
356361
* value at index 'i' corresponds to shuffledOffers[i]
357362
* @param tasks tasks scheduled per offer, value at index 'i' corresponds to shuffledOffers[i]
358363
* @param addressesWithDescs tasks scheduler per host:port, used for barrier tasks
359-
* @return tuple of (had delay schedule rejects?, option of min locality of launched task)
364+
* @return tuple of (no delay schedule rejects?, option of min locality of launched task)
360365
*/
361366
private def resourceOfferSingleTaskSet(
362367
taskSet: TaskSetManager,
@@ -402,9 +407,7 @@ private[spark] class TaskSchedulerImpl(
402407
// addresses are the same as that we allocated in taskResourceAssignments since it's
403408
// synchronized. We don't remove the exact addresses allocated because the current
404409
// approach produces the identical result with less time complexity.
405-
availableResources(i).getOrElse(rName,
406-
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
407-
.remove(0, rInfo.addresses.size)
410+
availableResources(i)(rName).remove(0, rInfo.addresses.size)
408411
}
409412
// Only update hosts for a barrier task.
410413
if (taskSet.isBarrier) {
@@ -469,8 +472,9 @@ private[spark] class TaskSchedulerImpl(
469472
resourceProfileIds: Array[Int],
470473
availableCpus: Array[Int],
471474
availableResources: Array[Map[String, Buffer[String]]],
472-
rpId: Int): Int = {
473-
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
475+
taskSet: TaskSetManager): Int = {
476+
val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(
477+
taskSet.taskSet.resourceProfileId)
474478
val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) =>
475479
(id == resourceProfile.id)
476480
}
@@ -485,9 +489,12 @@ private[spark] class TaskSchedulerImpl(
485489
numTasksPerExecCores
486490
} else {
487491
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
488-
.getOrElse(throw new SparkException("limitingResource returns from ResourceProfile" +
489-
s" $resourceProfile doesn't actually contain that task resource!")
490-
)
492+
.getOrElse {
493+
val errorMsg = "limitingResource returns from ResourceProfile " +
494+
s"$resourceProfile doesn't actually contain that task resource!"
495+
taskSet.abort(errorMsg)
496+
throw new SparkException(errorMsg)
497+
}
491498
// available addresses already takes into account if there are fractional
492499
// task resource requests
493500
val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0)
@@ -583,7 +590,7 @@ private[spark] class TaskSchedulerImpl(
583590
// value is -1
584591
val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
585592
val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources,
586-
taskSet.taskSet.resourceProfileId)
593+
taskSet)
587594
slots
588595
} else {
589596
-1
@@ -677,11 +684,18 @@ private[spark] class TaskSchedulerImpl(
677684
// Check whether the barrier tasks are partially launched.
678685
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
679686
// requirements are not fulfilled, and we should revert the launched tasks).
680-
require(addressesWithDescs.size == taskSet.numTasks,
681-
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
682-
s"because only ${addressesWithDescs.size} out of a total number of " +
683-
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
684-
"been blacklisted or cannot fulfill task locality requirements.")
687+
if (addressesWithDescs.size != taskSet.numTasks) {
688+
val errorMsg =
689+
s"Fail resource offers for barrier stage ${taskSet.stageId} because only " +
690+
s"${addressesWithDescs.size} out of a total number of ${taskSet.numTasks}" +
691+
s" tasks got resource offers. This happens because barrier execution currently " +
692+
s"does not work gracefully with delay scheduling. We highly recommend you to " +
693+
s"disable delay scheduling by setting spark.locality.wait=0 as a workaround if " +
694+
s"you see this error frequently."
695+
logWarning(errorMsg)
696+
taskSet.abort(errorMsg)
697+
throw new SparkException(errorMsg)
698+
}
685699

686700
// materialize the barrier coordinator.
687701
maybeInitBarrierCoordinator()
@@ -743,8 +757,12 @@ private[spark] class TaskSchedulerImpl(
743757
if (state == TaskState.LOST) {
744758
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
745759
// where each executor corresponds to a single task, so mark the executor as failed.
746-
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
747-
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
760+
val execId = taskIdToExecutorId.getOrElse(tid, {
761+
val errorMsg =
762+
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"
763+
taskSet.abort(errorMsg)
764+
throw new SparkException(errorMsg)
765+
})
748766
if (executorIdToRunningTaskIds.contains(execId)) {
749767
reason = Some(
750768
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))

core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
2626

2727
class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
2828

29-
def initLocalClusterSparkContext(): Unit = {
29+
def initLocalClusterSparkContext(numWorker: Int = 4): Unit = {
3030
val conf = new SparkConf()
3131
// Init local cluster here so each barrier task runs in a separated process, thus `barrier()`
3232
// call is actually useful.
33-
.setMaster("local-cluster[4, 1, 1024]")
33+
.setMaster(s"local-cluster[$numWorker, 1, 1024]")
3434
.setAppName("test-cluster")
3535
.set(TEST_NO_STAGE_RETRY, true)
3636
sc = new SparkContext(conf)
@@ -276,4 +276,20 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
276276
initLocalClusterSparkContext()
277277
testBarrierTaskKilled(interruptOnKill = true)
278278
}
279+
280+
test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
281+
initLocalClusterSparkContext(2)
282+
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
283+
val dep = new OneToOneDependency[Int](rdd0)
284+
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
285+
// scheduling. So, one of tasks won't be scheduled in one round of resource offer.
286+
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
287+
val errorMsg = intercept[SparkException] {
288+
rdd.barrier().mapPartitions { iter =>
289+
BarrierTaskContext.get().barrier()
290+
iter
291+
}.collect()
292+
}.getMessage
293+
assert(errorMsg.contains("Fail resource offers for barrier stage"))
294+
}
279295
}

docs/_data/menu-sql.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@
7878
subitems:
7979
- text: Data Types
8080
url: sql-ref-datatypes.html
81+
- text: Identifiers
82+
url: sql-ref-identifier.html
8183
- text: Literals
8284
url: sql-ref-literals.html
8385
- text: Null Semantics
8486
url: sql-ref-null-semantics.html
85-
- text: NaN Semantics
86-
url: sql-ref-nan-semantics.html
8787
- text: ANSI Compliance
8888
url: sql-ref-ansi-compliance.html
8989
subitems:

docs/core-migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ license: |
3535

3636
- Deprecated method `AccumulableInfo.apply` have been removed because creating `AccumulableInfo` is disallowed.
3737

38+
- Deprecated accumulator v1 APIs have been removed and please use v2 APIs instead.
39+
3840
- Event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark wrote the event log file as default charset of driver JVM process, so Spark History Server of Spark 2.x is needed to read the old event log files in case of incompatible encoding.
3941

4042
- A new protocol for fetching shuffle blocks is used. It's recommended that external shuffle services be upgraded when running Spark 3.0 apps. You can still use old external shuffle services by setting the configuration `spark.shuffle.useOldFetchProtocol` to `true`. Otherwise, Spark may run into errors with messages like `IllegalArgumentException: Unexpected message type: <number>`.

docs/monitoring.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,6 @@ This is the component with the largest amount of instrumented metrics
10561056
- compilationTime (histogram)
10571057
- generatedClassSize (histogram)
10581058
- generatedMethodSize (histogram)
1059-
- hiveClientCalls.count
10601059
- sourceCodeSize (histogram)
10611060

10621061
- namespace=DAGScheduler

docs/sql-migration-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ license: |
8181

8282
- In Spark version 2.4 and below, you can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, for example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. In Spark 3.0, Spark throws `RuntimeException` when duplicated keys are found. You can set `spark.sql.mapKeyDedupPolicy` to `LAST_WIN` to deduplicate map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (for example, Parquet), the behavior is undefined.
8383

84-
- In Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Set `spark.sql.legacy.allowUntypedScalaUDF` to true to keep using it. In Spark version 2.4 and below, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF returns null if the input values is null. However, in Spark 3.0, the UDF returns the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` returns null in Spark 2.4 and below if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.
84+
- In Spark 3.0, using `org.apache.spark.sql.functions.udf(AnyRef, DataType)` is not allowed by default. Remove the return type parameter to automatically switch to typed Scala udf is recommended, or set `spark.sql.legacy.allowUntypedScalaUDF` to true to keep using it. In Spark version 2.4 and below, if `org.apache.spark.sql.functions.udf(AnyRef, DataType)` gets a Scala closure with primitive-type argument, the returned UDF returns null if the input values is null. However, in Spark 3.0, the UDF returns the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` returns null in Spark 2.4 and below if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.
8585

8686
- In Spark 3.0, a higher-order function `exists` follows the three-valued boolean logic, that is, if the `predicate` returns any `null`s and no `true` is obtained, then `exists` returns `null` instead of `false`. For example, `exists(array(1, null, 3), x -> x % 2 == 0)` is `null`. The previous behaviorcan be restored by setting `spark.sql.legacy.followThreeValuedLogicInArrayExists` to `false`.
8787

docs/sql-ref-datatypes.md

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ license: |
1919
limitations under the License.
2020
---
2121

22+
### Supported Data Types
23+
2224
Spark SQL and DataFrames support the following data types:
2325

2426
* Numeric types
@@ -706,3 +708,120 @@ The following table shows the type names as well as aliases used in Spark SQL pa
706708
</table>
707709
</div>
708710
</div>
711+
712+
### Floating Point Special Values
713+
714+
Spark SQL supports several special floating point values in a case-insensitive manner:
715+
716+
* Inf/+Inf/Infinity/+Infinity: positive infinity
717+
* ```FloatType```: equivalent to Scala <code>Float.PositiveInfinity</code>.
718+
* ```DoubleType```: equivalent to Scala <code>Double.PositiveInfinity</code>.
719+
* -Inf/-Infinity: negative infinity
720+
* ```FloatType```: equivalent to Scala <code>Float.NegativeInfinity</code>.
721+
* ```DoubleType```: equivalent to Scala <code>Double.NegativeInfinity</code>.
722+
* NaN: not a number
723+
* ```FloatType```: equivalent to Scala <code>Float.NaN</code>.
724+
* ```DoubleType```: equivalent to Scala <code>Double.NaN</code>.
725+
726+
#### Positive/Negative Infinity Semantics
727+
728+
There is special handling for positive and negative infinity. They have the following semantics:
729+
730+
* Positive infinity multiplied by any positive value returns positive infinity.
731+
* Negative infinity multiplied by any positive value returns negative infinity.
732+
* Positive infinity multiplied by any negative value returns negative infinity.
733+
* Negative infinity multiplied by any negative value returns positive infinity.
734+
* Positive/negative infinity multiplied by 0 returns NaN.
735+
* Positive/negative infinity is equal to itself.
736+
* In aggregations, all positive infinity values are grouped together. Similarly, all negative infinity values are grouped together.
737+
* Positive infinity and negative infinity are treated as normal values in join keys.
738+
* Positive infinity sorts lower than NaN and higher than any other values.
739+
* Negative infinity sorts lower than any other values.
740+
741+
#### NaN Semantics
742+
743+
There is special handling for not-a-number (NaN) when dealing with `float` or `double` types that
744+
do not exactly match standard floating point semantics.
745+
Specifically:
746+
747+
* NaN = NaN returns true.
748+
* In aggregations, all NaN values are grouped together.
749+
* NaN is treated as a normal value in join keys.
750+
* NaN values go last when in ascending order, larger than any other numeric value.
751+
752+
#### Examples
753+
754+
{% highlight sql %}
755+
SELECT double('infinity') AS col;
756+
+--------+
757+
| col|
758+
+--------+
759+
|Infinity|
760+
+--------+
761+
762+
SELECT float('-inf') AS col;
763+
+---------+
764+
| col|
765+
+---------+
766+
|-Infinity|
767+
+---------+
768+
769+
SELECT float('NaN') AS col;
770+
+---+
771+
|col|
772+
+---+
773+
|NaN|
774+
+---+
775+
776+
SELECT double('infinity') * 0 AS col;
777+
+---+
778+
|col|
779+
+---+
780+
|NaN|
781+
+---+
782+
783+
SELECT double('-infinity') * (-1234567) AS col;
784+
+--------+
785+
| col|
786+
+--------+
787+
|Infinity|
788+
+--------+
789+
790+
SELECT double('infinity') < double('NaN') AS col;
791+
+----+
792+
| col|
793+
+----+
794+
|true|
795+
+----+
796+
797+
SELECT double('NaN') = double('NaN') AS col;
798+
+----+
799+
| col|
800+
+----+
801+
|true|
802+
+----+
803+
804+
SELECT double('inf') = double('infinity') AS col;
805+
+----+
806+
| col|
807+
+----+
808+
|true|
809+
+----+
810+
811+
CREATE TABLE test (c1 int, c2 double);
812+
INSERT INTO test VALUES (1, double('infinity'));
813+
INSERT INTO test VALUES (2, double('infinity'));
814+
INSERT INTO test VALUES (3, double('inf'));
815+
INSERT INTO test VALUES (4, double('-inf'));
816+
INSERT INTO test VALUES (5, double('NaN'));
817+
INSERT INTO test VALUES (6, double('NaN'));
818+
INSERT INTO test VALUES (7, double('-infinity'));
819+
SELECT COUNT(*), c2 FROM test GROUP BY c2;
820+
+---------+---------+
821+
| count(1)| c2|
822+
+---------+---------+
823+
| 2| NaN|
824+
| 2|-Infinity|
825+
| 3| Infinity|
826+
+---------+---------+
827+
{% endhighlight %}

0 commit comments

Comments
 (0)