Skip to content

Commit 96b2280

Browse files
committed
Merge branch 'master' into issues/SPARK-26060/set_command
2 parents 4e7b6bb + fa0d4bf commit 96b2280

File tree

27 files changed

+529
-439
lines changed

27 files changed

+529
-439
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.broadcast
1919

2020
import java.io._
21+
import java.lang.ref.SoftReference
2122
import java.nio.ByteBuffer
2223
import java.util.zip.Adler32
2324

@@ -61,9 +62,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
6162
* Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
6263
* which builds this value by reading blocks from the driver and/or other executors.
6364
*
64-
* On the driver, if the value is required, it is read lazily from the block manager.
65+
* On the driver, if the value is required, it is read lazily from the block manager. We hold
66+
* a soft reference so that it can be garbage collected if required, as we can always reconstruct
67+
* in the future.
6568
*/
66-
@transient private lazy val _value: T = readBroadcastBlock()
69+
@transient private var _value: SoftReference[T] = _
6770

6871
/** The compression codec to use, or None if compression is disabled */
6972
@transient private var compressionCodec: Option[CompressionCodec] = _
@@ -92,8 +95,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
9295
/** The checksum for all the blocks. */
9396
private var checksums: Array[Int] = _
9497

95-
override protected def getValue() = {
96-
_value
98+
override protected def getValue() = synchronized {
99+
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
100+
if (memoized != null) {
101+
memoized
102+
} else {
103+
val newlyRead = readBroadcastBlock()
104+
_value = new SoftReference[T](newlyRead)
105+
newlyRead
106+
}
97107
}
98108

99109
private def calcChecksum(block: ByteBuffer): Int = {
@@ -205,8 +215,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
205215
}
206216

207217
private def readBroadcastBlock(): T = Utils.tryOrIOException {
208-
TorrentBroadcast.synchronized {
209-
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
218+
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
219+
broadcastCache.synchronized {
210220

211221
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
212222
setConf(SparkEnv.get.conf)

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ private[deploy] object DependencyUtils extends Logging {
6161
hadoopConf: Configuration,
6262
secMgr: SecurityManager): String = {
6363
val targetDir = Utils.createTempDir()
64+
val userJarName = userJar.split(File.separatorChar).last
6465
Option(jars)
6566
.map {
6667
resolveGlobPaths(_, hadoopConf)
6768
.split(",")
68-
.filterNot(_.contains(userJar.split("/").last))
69+
.filterNot(_.contains(userJarName))
6970
.mkString(",")
7071
}
7172
.filterNot(_ == "")

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,25 @@ class SparkSubmitSuite
962962
}
963963
}
964964

965+
test("remove copies of application jar from classpath") {
966+
val fs = File.separator
967+
val sparkConf = new SparkConf(false)
968+
val hadoopConf = new Configuration()
969+
val secMgr = new SecurityManager(sparkConf)
970+
971+
val appJarName = "myApp.jar"
972+
val jar1Name = "myJar1.jar"
973+
val jar2Name = "myJar2.jar"
974+
val userJar = s"file:/path${fs}to${fs}app${fs}jar$fs$appJarName"
975+
val jars = s"file:/$jar1Name,file:/$appJarName,file:/$jar2Name"
976+
977+
val resolvedJars = DependencyUtils
978+
.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf, secMgr)
979+
980+
assert(!resolvedJars.contains(appJarName))
981+
assert(resolvedJars.contains(jar1Name) && resolvedJars.contains(jar2Name))
982+
}
983+
965984
test("Avoid re-upload remote resources in yarn client mode") {
966985
val hadoopConf = new Configuration()
967986
updateConfWithFakeS3Fs(hadoopConf)

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ displayTitle: Spark SQL Upgrading Guide
2727

2828
- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more.
2929

30+
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined.
31+
3032
- In Spark version 2.4 and earlier, the `SET` command works without any warnings even if the specified key is for `SparkConf` entries and it has no effect because the command does not update `SparkConf`, but the behavior might confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You can disable such a check by setting `spark.sql.execution.setCommandRejectsSparkConfs` to `false`.
3133

3234
## Upgrading From Spark SQL 2.3 to 2.4

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.avro
1919

20-
import java.math.{BigDecimal}
20+
import java.math.BigDecimal
2121
import java.nio.ByteBuffer
2222

2323
import scala.collection.JavaConverters._
@@ -218,6 +218,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
218218
i += 1
219219
}
220220

221+
// The Avro map will never have null or duplicated map keys, it's safe to create a
222+
// ArrayBasedMapData directly here.
221223
updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
222224

223225
case (UNION, _) =>

mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
3737
import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
3838
import org.apache.spark.ml.util._
3939
import org.apache.spark.ml.util.Instrumentation.instrumented
40-
import org.apache.spark.sql.{DataFrame, Dataset, Row}
40+
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
4141
import org.apache.spark.sql.functions._
4242
import org.apache.spark.sql.types._
4343
import org.apache.spark.storage.StorageLevel
@@ -169,6 +169,12 @@ final class OneVsRestModel private[ml] (
169169
// Check schema
170170
transformSchema(dataset.schema, logging = true)
171171

172+
if (getPredictionCol == "" && getRawPredictionCol == "") {
173+
logWarning(s"$uid: OneVsRestModel.transform() was called as NOOP" +
174+
" since no output columns were set.")
175+
return dataset.toDF
176+
}
177+
172178
// determine the input columns: these need to be passed through
173179
val origCols = dataset.schema.map(f => col(f.name))
174180

@@ -209,6 +215,9 @@ final class OneVsRestModel private[ml] (
209215
newDataset.unpersist()
210216
}
211217

218+
var predictionColNames = Seq.empty[String]
219+
var predictionColumns = Seq.empty[Column]
220+
212221
if (getRawPredictionCol != "") {
213222
val numClass = models.length
214223

@@ -219,24 +228,24 @@ final class OneVsRestModel private[ml] (
219228
Vectors.dense(predArray)
220229
}
221230

222-
// output the index of the classifier with highest confidence as prediction
223-
val labelUDF = udf { (rawPredictions: Vector) => rawPredictions.argmax.toDouble }
231+
predictionColNames = predictionColNames :+ getRawPredictionCol
232+
predictionColumns = predictionColumns :+ rawPredictionUDF(col(accColName))
233+
}
224234

225-
// output confidence as raw prediction, label and label metadata as prediction
226-
aggregatedDataset
227-
.withColumn(getRawPredictionCol, rawPredictionUDF(col(accColName)))
228-
.withColumn(getPredictionCol, labelUDF(col(getRawPredictionCol)), labelMetadata)
229-
.drop(accColName)
230-
} else {
235+
if (getPredictionCol != "") {
231236
// output the index of the classifier with highest confidence as prediction
232237
val labelUDF = udf { (predictions: Map[Int, Double]) =>
233238
predictions.maxBy(_._2)._1.toDouble
234239
}
235-
// output label and label metadata as prediction
236-
aggregatedDataset
237-
.withColumn(getPredictionCol, labelUDF(col(accColName)), labelMetadata)
238-
.drop(accColName)
240+
241+
predictionColNames = predictionColNames :+ getPredictionCol
242+
predictionColumns = predictionColumns :+ labelUDF(col(accColName))
243+
.as(getPredictionCol, labelMetadata)
239244
}
245+
246+
aggregatedDataset
247+
.withColumns(predictionColNames, predictionColumns)
248+
.drop(accColName)
240249
}
241250

242251
@Since("1.4.1")

mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,32 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
290290
checkModelData(ovaModel, newOvaModel)
291291
}
292292

293+
test("should ignore empty output cols") {
294+
val lr = new LogisticRegression().setMaxIter(1)
295+
val ovr = new OneVsRest().setClassifier(lr)
296+
val ovrModel = ovr.fit(dataset)
297+
298+
val output1 = ovrModel.setPredictionCol("").setRawPredictionCol("")
299+
.transform(dataset)
300+
assert(output1.schema.fieldNames.toSet ===
301+
Set("label", "features"))
302+
303+
val output2 = ovrModel.setPredictionCol("prediction").setRawPredictionCol("")
304+
.transform(dataset)
305+
assert(output2.schema.fieldNames.toSet ===
306+
Set("label", "features", "prediction"))
307+
308+
val output3 = ovrModel.setPredictionCol("").setRawPredictionCol("rawPrediction")
309+
.transform(dataset)
310+
assert(output3.schema.fieldNames.toSet ===
311+
Set("label", "features", "rawPrediction"))
312+
313+
val output4 = ovrModel.setPredictionCol("prediction").setRawPredictionCol("rawPrediction")
314+
.transform(dataset)
315+
assert(output4.schema.fieldNames.toSet ===
316+
Set("label", "features", "prediction", "rawPrediction"))
317+
}
318+
293319
test("should support all NumericType labels and not support other types") {
294320
val ovr = new OneVsRest().setClassifier(new LogisticRegression().setMaxIter(1))
295321
MLTestingUtils.checkNumericTypes[OneVsRestModel, OneVsRest](

python/pyspark/sql/functions.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2656,11 +2656,11 @@ def map_concat(*cols):
26562656
>>> from pyspark.sql.functions import map_concat
26572657
>>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as map1, map(3, 'c', 1, 'd') as map2")
26582658
>>> df.select(map_concat("map1", "map2").alias("map3")).show(truncate=False)
2659-
+--------------------------------+
2660-
|map3 |
2661-
+--------------------------------+
2662-
|[1 -> a, 2 -> b, 3 -> c, 1 -> d]|
2663-
+--------------------------------+
2659+
+------------------------+
2660+
|map3 |
2661+
+------------------------+
2662+
|[1 -> d, 2 -> b, 3 -> c]|
2663+
+------------------------+
26642664
"""
26652665
sc = SparkContext._active_spark_context
26662666
if len(cols) == 1 and isinstance(cols[0], (list, set)):

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
* Currently we just use 2 UnsafeArrayData to represent UnsafeMapData, with extra 8 bytes at head
2929
* to indicate the number of bytes of the unsafe key array.
3030
* [unsafe key array numBytes] [unsafe key array] [unsafe value array]
31+
*
32+
* Note that, user is responsible to guarantee that the key array does not have duplicated
33+
* elements, otherwise the behavior is undefined.
3134
*/
3235
// TODO: Use a more efficient format which doesn't depend on unsafe array.
3336
public final class UnsafeMapData extends MapData {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,6 @@ object CatalystTypeConverters {
431431
map,
432432
(key: Any) => convertToCatalyst(key),
433433
(value: Any) => convertToCatalyst(value))
434-
case (keys: Array[_], values: Array[_]) =>
435-
// case for mapdata with duplicate keys
436-
new ArrayBasedMapData(
437-
new GenericArrayData(keys.map(convertToCatalyst)),
438-
new GenericArrayData(values.map(convertToCatalyst))
439-
)
440434
case other => other
441435
}
442436

0 commit comments

Comments
 (0)