Skip to content

Commit a00bc38

Browse files
committed
Moved push of data out of ShuffleWriter and addressed other minor comments
1 parent 74f1f9f commit a00bc38

File tree

6 files changed

+513
-506
lines changed

6 files changed

+513
-506
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.metrics.source.JVMCPUSource
4646
import org.apache.spark.resource.ResourceInformation
4747
import org.apache.spark.rpc.RpcTimeout
4848
import org.apache.spark.scheduler._
49-
import org.apache.spark.shuffle.{FetchFailedException, ShuffleWriter}
49+
import org.apache.spark.shuffle.{FetchFailedException, PushShuffleSupport}
5050
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
5151
import org.apache.spark.util._
5252
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -318,7 +318,7 @@ private[spark] class Executor(
318318
case NonFatal(e) =>
319319
logWarning("Unable to stop heartbeater", e)
320320
}
321-
ShuffleWriter.stop
321+
PushShuffleSupport.stop()
322322
threadPool.shutdown()
323323

324324
// Notify plugins that executor is shutting down so they can terminate cleanly

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1941,16 +1941,16 @@ package object config {
19411941

19421942
private[spark] val PUSH_SHUFFLE_ENABLED =
19431943
ConfigBuilder("spark.shuffle.push.enabled")
1944-
.doc("Set to 'true' to enable push based shuffle")
1944+
.doc("Set to 'true' to enable push based shuffle.")
19451945
.version("3.1.0")
19461946
.booleanConf
19471947
.createWithDefault(false)
19481948

19491949
private[spark] val PUSH_SHUFFLE_NUM_PUSH_THREADS =
19501950
ConfigBuilder("spark.shuffle.push.numPushThreads")
1951-
.doc("Specify the number of threads in the block pusher pool. These threads assist " +
1951+
.doc("Specify the number of threads in the block pusher pool. These threads assist " +
19521952
"in creating connections and pushing blocks to remote shuffle services when push based " +
1953-
"shuffle is enabled. By default, the threadpool size is equal to the number of cores")
1953+
"shuffle is enabled. By default, the threadpool size is equal to the number of cores.")
19541954
.version("3.1.0")
19551955
.intConf
19561956
.createOptional

0 commit comments

Comments
 (0)