@@ -27,7 +27,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
2727import com .google .common .base .Throwables
2828
2929import org .apache .spark .{ShuffleDependency , SparkConf , SparkEnv }
30- import org .apache .spark .internal .{config , Logging }
30+ import org .apache .spark .annotation .Since
31+ import org .apache .spark .internal .Logging
3132import org .apache .spark .internal .config ._
3233import org .apache .spark .launcher .SparkLauncher
3334import org .apache .spark .network .buffer .{FileSegmentManagedBuffer , ManagedBuffer , NioManagedBuffer }
@@ -68,11 +69,9 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
6869 // VisibleForTesting
6970 private [shuffle] def createErrorHandler (): BlockPushErrorHandler = {
7071 new BlockPushErrorHandler () {
71- /**
72- * For a connection exception against a particular host, we will stop pushing any
73- * blocks to just that host and continue push blocks to other hosts. So, here push of
74- * all blocks will only stop when it is "Too Late". Also see updateStateAndCheckIfPushMore.
75- */
72+ // For a connection exception against a particular host, we will stop pushing any
73+ // blocks to just that host and continue push blocks to other hosts. So, here push of
74+ // all blocks will only stop when it is "Too Late". Also see updateStateAndCheckIfPushMore.
7675 override def shouldRetryError (t : Throwable ): Boolean = {
7776 // If the block is too late, there is no need to retry it
7877 ! Throwables .getStackTraceAsString(t).contains(BlockPushErrorHandler .TOO_LATE_MESSAGE_SUFFIX )
@@ -111,7 +110,7 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
111110
112111 maxBytesInFlight = conf.getSizeAsMb(" spark.reducer.maxSizeInFlight" , " 48m" ) * 1024 * 1024
113112 maxReqsInFlight = conf.getInt(" spark.reducer.maxReqsInFlight" , Int .MaxValue )
114- maxBlocksInFlightPerAddress = conf.get(config. REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS )
113+ maxBlocksInFlightPerAddress = conf.get(REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS )
115114
116115 val requests = prepareBlockPushRequests(numPartitions, partitionId, dep.shuffleId, dataFile,
117116 partitionLengths, mergerLocs, transportConf, maxBlockSizeToPush, maxBlockBatchSize)
@@ -382,9 +381,9 @@ private[spark] abstract class ShuffleWriter[K, V] extends Logging {
382381 // exceeds the max block size to push limit. This guarantees that each PushReqeust
383382 // represents continuous blocks in the shuffle file to be pushed to the same shuffle
384383 // service, and does not go beyond existing limitations.
385- if (currentReqSize + blockSize <= maxBlockBatchSize &&
386- blocks.size < maxBlocksInFlightPerAddress &&
387- mergerId == currentMergerId && blockSize <= maxBlockSizeToPush) {
384+ if (currentReqSize + blockSize <= maxBlockBatchSize
385+ && blocks.size < maxBlocksInFlightPerAddress
386+ && mergerId == currentMergerId && blockSize <= maxBlockSizeToPush) {
388387 // Add current block to current batch
389388 currentReqSize += blockSize
390389 } else {
@@ -442,6 +441,7 @@ private[spark] object ShuffleWriter {
442441 * @param reqBuffer a chunk of data in the shuffle data file corresponding to the continuous
443442 * blocks represented in this request
444443 */
444+ @ Since (" 3.1.0" )
445445 case class PushRequest (
446446 address : BlockManagerId ,
447447 blocks : Seq [(BlockId , Long )],
@@ -454,6 +454,7 @@ private[spark] object ShuffleWriter {
454454 * @param blockId blockId
455455 * @param failure exception if the push was unsuccessful; null otherwise;
456456 */
457+ @ Since (" 3.1.0" )
457458 private case class PushResult (
458459 blockId : String ,
459460 failure : Throwable
0 commit comments