From 60889d2babec9bee3a0cd614f86999e9d3700cd2 Mon Sep 17 00:00:00 2001 From: schintap Date: Wed, 30 Jan 2019 10:14:14 -0500 Subject: [PATCH 1/3] Fix ChunkFetchIntegrationSuite --- .../main/java/org/apache/spark/network/TransportContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 1a3f3f2a6f24..94ad37b0ab01 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -88,7 +88,7 @@ public class TransportContext { // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling // max number of TransportServer worker threads that are blocked on writing response // of ChunkFetchRequest message back to the client via the underlying channel. - private static EventLoopGroup chunkFetchWorkers; + private EventLoopGroup chunkFetchWorkers; public TransportContext(TransportConf conf, RpcHandler rpcHandler) { this(conf, rpcHandler, false, false); From 657dff2c54495d34427a73f7e3e46066e7cceb27 Mon Sep 17 00:00:00 2001 From: schintap Date: Thu, 31 Jan 2019 08:33:47 -0500 Subject: [PATCH 2/3] Remove synchronize block around shuffle server transport context initialization --- .../apache/spark/network/TransportContext.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 94ad37b0ab01..c991cda51c09 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -122,16 +122,14 @@ public TransportContext( this.closeIdleConnections = closeIdleConnections; this.isClientOnly = isClientOnly; - synchronized(TransportContext.class) { - if (chunkFetchWorkers == null && - conf.getModuleName() != null && - conf.getModuleName().equalsIgnoreCase("shuffle") && - !isClientOnly) { - chunkFetchWorkers = NettyUtils.createEventLoop( - IOMode.valueOf(conf.ioMode()), - conf.chunkFetchHandlerThreads(), - "shuffle-chunk-fetch-handler"); - } + if (chunkFetchWorkers == null && + conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle") && + !isClientOnly) { + chunkFetchWorkers = NettyUtils.createEventLoop( + IOMode.valueOf(conf.ioMode()), + conf.chunkFetchHandlerThreads(), + "shuffle-chunk-fetch-handler"); } } From 3f0eb607556efa55af70eb6f012a7cf0e3119c61 Mon Sep 17 00:00:00 2001 From: schintap Date: Thu, 31 Jan 2019 09:24:28 -0500 Subject: [PATCH 3/3] Make event loop group final --- .../java/org/apache/spark/network/TransportContext.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index c991cda51c09..0bc5dd5402d5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -88,7 +88,7 @@ public class TransportContext { // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling // max number of TransportServer worker threads that are blocked on writing response // of ChunkFetchRequest message back to the client via the underlying channel. - private EventLoopGroup chunkFetchWorkers; + private final EventLoopGroup chunkFetchWorkers; public TransportContext(TransportConf conf, RpcHandler rpcHandler) { this(conf, rpcHandler, false, false); @@ -122,14 +122,15 @@ public TransportContext( this.closeIdleConnections = closeIdleConnections; this.isClientOnly = isClientOnly; - if (chunkFetchWorkers == null && - conf.getModuleName() != null && + if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") && !isClientOnly) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), "shuffle-chunk-fetch-handler"); + } else { + chunkFetchWorkers = null; } }