Skip to content

Commit 4739977

Browse files
committed
Merge remote-tracking branch 'spark/master' into feature/array-api-map_entries-to-master
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
2 parents b9e2409 + e35ad3c commit 4739977

File tree

68 files changed

+2217
-898
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2217
-898
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ exportMethods("%<=>%",
258258
"expr",
259259
"factorial",
260260
"first",
261+
"flatten",
261262
"floor",
262263
"format_number",
263264
"format_string",

R/pkg/R/functions.R

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ NULL
208208
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
209209
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
210210
#' head(select(tmp, array_position(tmp$v1, 21)))
211+
#' head(select(tmp, flatten(tmp$v1)))
211212
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
212213
#' head(tmp2)
213214
#' head(select(tmp, posexplode(tmp$v1)))
@@ -3035,6 +3036,19 @@ setMethod("array_position",
30353036
column(jc)
30363037
})
30373038

3039+
#' @details
3040+
#' \code{flatten}: Transforms an array of arrays into a single array.
3041+
#'
3042+
#' @rdname column_collection_functions
3043+
#' @aliases flatten flatten,Column-method
3044+
#' @note flatten since 2.4.0
3045+
setMethod("flatten",
3046+
signature(x = "Column"),
3047+
function(x) {
3048+
jc <- callJStatic("org.apache.spark.sql.functions", "flatten", x@jc)
3049+
column(jc)
3050+
})
3051+
30383052
#' @details
30393053
#' \code{map_keys}: Returns an unordered array containing the keys of the map.
30403054
#'

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,10 @@ setGeneric("explode_outer", function(x) { standardGeneric("explode_outer") })
918918
#' @name NULL
919919
setGeneric("expr", function(x) { standardGeneric("expr") })
920920

921+
#' @rdname column_collection_functions
922+
#' @name NULL
923+
setGeneric("flatten", function(x) { standardGeneric("flatten") })
924+
921925
#' @rdname column_datetime_diff_functions
922926
#' @name NULL
923927
setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,12 @@ test_that("column functions", {
15021502
result <- collect(select(df, sort_array(df[[1]])))[[1]]
15031503
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
15041504

1505+
# Test flattern
1506+
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L, 4L))),
1507+
list(list(list(5L, 6L), list(7L, 8L)))))
1508+
result <- collect(select(df, flatten(df[[1]])))[[1]]
1509+
expect_equal(result, list(list(1L, 2L, 3L, 4L), list(5L, 6L, 7L, 8L)))
1510+
15051511
# Test map_keys(), map_values() and element_at()
15061512
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
15071513
result <- collect(select(df, map_keys(df$map)))[[1]]

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolE
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

2424
import scala.collection.JavaConverters._
25-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
25+
import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map}
2626
import scala.concurrent.{ExecutionContext, Future}
2727
import scala.concurrent.duration.Duration
2828
import scala.reflect.ClassTag
@@ -282,7 +282,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
282282

283283
// For testing
284284
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
285-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
285+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
286286
getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
287287
}
288288

@@ -296,7 +296,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
296296
* describing the shuffle blocks that are stored at that block manager.
297297
*/
298298
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
299-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])]
299+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
300300

301301
/**
302302
* Deletes map output status information for the specified shuffle stage.
@@ -632,17 +632,18 @@ private[spark] class MapOutputTrackerMaster(
632632
}
633633
}
634634

635+
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
635636
// This method is only called in local-mode.
636637
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
637-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
638+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
638639
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
639640
shuffleStatuses.get(shuffleId) match {
640641
case Some (shuffleStatus) =>
641642
shuffleStatus.withMapStatuses { statuses =>
642643
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
643644
}
644645
case None =>
645-
Seq.empty
646+
Iterator.empty
646647
}
647648
}
648649

@@ -669,8 +670,9 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
669670
/** Remembers which map output locations are currently being fetched on an executor. */
670671
private val fetching = new HashSet[Int]
671672

673+
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
672674
override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
673-
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
675+
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
674676
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
675677
val statuses = getStatuses(shuffleId)
676678
try {
@@ -841,6 +843,7 @@ private[spark] object MapOutputTracker extends Logging {
841843
* Given an array of map statuses and a range of map output partitions, returns a sequence that,
842844
* for each block manager ID, lists the shuffle block IDs and corresponding shuffle block sizes
843845
* stored at that block manager.
846+
* Note that empty blocks are filtered in the result.
844847
*
845848
* If any of the statuses is null (indicating a missing location due to a failed mapper),
846849
* throws a FetchFailedException.
@@ -857,22 +860,24 @@ private[spark] object MapOutputTracker extends Logging {
857860
shuffleId: Int,
858861
startPartition: Int,
859862
endPartition: Int,
860-
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
863+
statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
861864
assert (statuses != null)
862-
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
863-
for ((status, mapId) <- statuses.zipWithIndex) {
865+
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long)]]
866+
for ((status, mapId) <- statuses.iterator.zipWithIndex) {
864867
if (status == null) {
865868
val errorMessage = s"Missing an output location for shuffle $shuffleId"
866869
logError(errorMessage)
867870
throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
868871
} else {
869872
for (part <- startPartition until endPartition) {
870-
splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
871-
((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
873+
val size = status.getSizeForBlock(part)
874+
if (size != 0) {
875+
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
876+
((ShuffleBlockId(shuffleId, mapId, part), size))
877+
}
872878
}
873879
}
874880
}
875-
876-
splitsByAddress.toSeq
881+
splitsByAddress.iterator
877882
}
878883
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.deploy.history
1919

2020
import java.io.{File, FileNotFoundException, IOException}
21+
import java.nio.file.Files
22+
import java.nio.file.attribute.PosixFilePermissions
2123
import java.util.{Date, ServiceLoader}
2224
import java.util.concurrent.{ExecutorService, TimeUnit}
2325
import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -130,8 +132,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
130132

131133
// Visible for testing.
132134
private[history] val listing: KVStore = storePath.map { path =>
133-
require(path.isDirectory(), s"Configured store directory ($path) does not exist.")
134-
val dbPath = new File(path, "listing.ldb")
135+
val perms = PosixFilePermissions.fromString("rwx------")
136+
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath(),
137+
PosixFilePermissions.asFileAttribute(perms)).toFile()
138+
135139
val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
136140
AppStatusStore.CURRENT_VERSION, logDir.toString())
137141

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ import org.apache.spark.util.io.ChunkedByteBufferOutputStream
4848
* @param blockManager [[BlockManager]] for reading local blocks
4949
* @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
5050
* For each block we also require the size (in bytes as a long field) in
51-
* order to throttle the memory usage.
51+
* order to throttle the memory usage. Note that zero-sized blocks are
52+
* already excluded, which happened in
53+
* [[MapOutputTracker.convertMapStatuses]].
5254
* @param streamWrapper A function to wrap the returned input stream.
5355
* @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
5456
* @param maxReqsInFlight max number of remote requests to fetch blocks at any given point.
@@ -62,7 +64,7 @@ final class ShuffleBlockFetcherIterator(
6264
context: TaskContext,
6365
shuffleClient: ShuffleClient,
6466
blockManager: BlockManager,
65-
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
67+
blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long)])],
6668
streamWrapper: (BlockId, InputStream) => InputStream,
6769
maxBytesInFlight: Long,
6870
maxReqsInFlight: Int,
@@ -74,8 +76,8 @@ final class ShuffleBlockFetcherIterator(
7476
import ShuffleBlockFetcherIterator._
7577

7678
/**
77-
* Total number of blocks to fetch. This can be smaller than the total number of blocks
78-
* in [[blocksByAddress]] because we filter out zero-sized blocks in [[initialize]].
79+
* Total number of blocks to fetch. This should be equal to the total number of blocks
80+
* in [[blocksByAddress]] because we already filter out zero-sized blocks in [[blocksByAddress]].
7981
*
8082
* This should equal localBlocks.size + remoteBlocks.size.
8183
*/
@@ -267,28 +269,32 @@ final class ShuffleBlockFetcherIterator(
267269
// at most maxBytesInFlight in order to limit the amount of data in flight.
268270
val remoteRequests = new ArrayBuffer[FetchRequest]
269271

270-
// Tracks total number of blocks (including zero sized blocks)
271-
var totalBlocks = 0
272272
for ((address, blockInfos) <- blocksByAddress) {
273-
totalBlocks += blockInfos.size
274273
if (address.executorId == blockManager.blockManagerId.executorId) {
275-
// Filter out zero-sized blocks
276-
localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
274+
blockInfos.find(_._2 <= 0) match {
275+
case Some((blockId, size)) if size < 0 =>
276+
throw new BlockException(blockId, "Negative block size " + size)
277+
case Some((blockId, size)) if size == 0 =>
278+
throw new BlockException(blockId, "Zero-sized blocks should be excluded.")
279+
case None => // do nothing.
280+
}
281+
localBlocks ++= blockInfos.map(_._1)
277282
numBlocksToFetch += localBlocks.size
278283
} else {
279284
val iterator = blockInfos.iterator
280285
var curRequestSize = 0L
281286
var curBlocks = new ArrayBuffer[(BlockId, Long)]
282287
while (iterator.hasNext) {
283288
val (blockId, size) = iterator.next()
284-
// Skip empty blocks
285-
if (size > 0) {
289+
if (size < 0) {
290+
throw new BlockException(blockId, "Negative block size " + size)
291+
} else if (size == 0) {
292+
throw new BlockException(blockId, "Zero-sized blocks should be excluded.")
293+
} else {
286294
curBlocks += ((blockId, size))
287295
remoteBlocks += blockId
288296
numBlocksToFetch += 1
289297
curRequestSize += size
290-
} else if (size < 0) {
291-
throw new BlockException(blockId, "Negative block size " + size)
292298
}
293299
if (curRequestSize >= targetRequestSize ||
294300
curBlocks.size >= maxBlocksInFlightPerAddress) {
@@ -306,7 +312,8 @@ final class ShuffleBlockFetcherIterator(
306312
}
307313
}
308314
}
309-
logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
315+
logInfo(s"Getting $numBlocksToFetch non-empty blocks including ${localBlocks.size}" +
316+
s" local blocks and ${remoteBlocks.size} remote blocks")
310317
remoteRequests
311318
}
312319

@@ -407,6 +414,25 @@ final class ShuffleBlockFetcherIterator(
407414
logDebug("Number of requests in flight " + reqsInFlight)
408415
}
409416

417+
if (buf.size == 0) {
418+
// We will never legitimately receive a zero-size block. All blocks with zero records
419+
// have zero size and all zero-size blocks have no records (and hence should never
420+
// have been requested in the first place). This statement relies on behaviors of the
421+
// shuffle writers, which are guaranteed by the following test cases:
422+
//
423+
// - BypassMergeSortShuffleWriterSuite: "write with some empty partitions"
424+
// - UnsafeShuffleWriterSuite: "writeEmptyIterator"
425+
// - DiskBlockObjectWriterSuite: "commit() and close() without ever opening or writing"
426+
//
427+
// There is not an explicit test for SortShuffleWriter but the underlying APIs that
428+
// uses are shared by the UnsafeShuffleWriter (both writers use DiskBlockObjectWriter
429+
// which returns a zero-size from commitAndGet() in case no records were written
430+
// since the last call.
431+
val msg = s"Received a zero-size buffer for block $blockId from $address " +
432+
s"(expectedApproxSize = $size, isNetworkReqDone=$isNetworkReqDone)"
433+
throwFetchFailedException(blockId, address, new IOException(msg))
434+
}
435+
410436
val in = try {
411437
buf.createInputStream()
412438
} catch {

core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T](
486486
param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] {
487487
private[spark] var _value = initialValue // Current value on driver
488488

489-
override def isZero: Boolean = _value == param.zero(initialValue)
489+
@transient private lazy val _zero = param.zero(initialValue)
490+
491+
override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef])
490492

491493
override def copy(): LegacyAccumulatorWrapper[R, T] = {
492494
val acc = new LegacyAccumulatorWrapper(initialValue, param)
@@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T](
495497
}
496498

497499
override def reset(): Unit = {
498-
_value = param.zero(initialValue)
500+
_value = _zero
499501
}
500502

501503
override def add(v: T): Unit = _value = param.addAccumulator(_value, v)

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
147147
masterTracker.registerMapOutput(10, 0, MapStatus(
148148
BlockManagerId("a", "hostA", 1000), Array(1000L)))
149149
slaveTracker.updateEpoch(masterTracker.getEpoch)
150-
assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
150+
assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
151151
Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
152152
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
153153

@@ -298,4 +298,33 @@ class MapOutputTrackerSuite extends SparkFunSuite {
298298
}
299299
}
300300

301+
test("zero-sized blocks should be excluded when getMapSizesByExecutorId") {
302+
val rpcEnv = createRpcEnv("test")
303+
val tracker = newTrackerMaster()
304+
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
305+
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
306+
tracker.registerShuffle(10, 2)
307+
308+
val size0 = MapStatus.decompressSize(MapStatus.compressSize(0L))
309+
val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
310+
val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
311+
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
312+
Array(size0, size1000, size0, size10000)))
313+
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
314+
Array(size10000, size0, size1000, size0)))
315+
assert(tracker.containsShuffle(10))
316+
assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
317+
Seq(
318+
(BlockManagerId("a", "hostA", 1000),
319+
Seq((ShuffleBlockId(10, 0, 1), size1000), (ShuffleBlockId(10, 0, 3), size10000))),
320+
(BlockManagerId("b", "hostB", 1000),
321+
Seq((ShuffleBlockId(10, 1, 0), size10000), (ShuffleBlockId(10, 1, 2), size1000)))
322+
)
323+
)
324+
325+
tracker.unregisterShuffle(10)
326+
tracker.stop()
327+
rpcEnv.shutdown()
328+
}
329+
301330
}

core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
108108
val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId)
109109
(shuffleBlockId, byteOutputStream.size().toLong)
110110
}
111-
Seq((localBlockManagerId, shuffleBlockIdsAndSizes))
111+
Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).toIterator
112112
}
113113

114114
// Create a mocked shuffle handle to pass into HashShuffleReader.

0 commit comments

Comments
 (0)