Skip to content

Commit c67115a

Browse files
wakunGitHub Enterprise
authored andcommitted
[CARMEL-6222]Improve congestion in shuffle service (#1068)
* [CARMEL-6222]Improve congestion in shuffle service
1 parent 6b6221e commit c67115a

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,8 @@ public Meter getChunkFetchRequestHandleSuccessRate() {
333333
public Meter getChunkFetchRequestHandleFailRate() {
334334
return chunkFetchRequestHandleFailRate;
335335
}
336+
337+
public long getChunksBeingTransferred() {
338+
return rpcHandler.getStreamManager().chunksBeingTransferred();
339+
}
336340
}

common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestCongestionHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ private String findHeaviestShuffle() {
166166
* HIGH_CONGESTION --> NO_CONGESTION
167167
*/
168168
private void checkCongestion() {
169-
int requestPendingTasks = transportContext.getPendingTasks();
169+
int requestPendingTasks =
170+
transportContext.getPendingTasks() + (int)streamManager.chunksBeingTransferred();
170171
CongestionMode prevMode = congestionMode;
171172
if (requestPendingTasks <= lowWaterMark) {
172173
congestionMode = CongestionMode.NO;

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ protected void serviceInit(Configuration conf) throws Exception {
207207
blockHandler.getAllMetrics().getMetrics().put("numNettyIOPendingTasks",
208208
(Gauge) () -> shuffleServer.getPendingTasks());
209209

210+
blockHandler.getAllMetrics().getMetrics().put("chunksBeingTransferred",
211+
(Gauge) () -> transportContext.getChunksBeingTransferred());
212+
210213
blockHandler.getAllMetrics().getMetrics().put("numNettyChunkFetchPendingTasks",
211214
(Gauge) () -> transportContext.getPendingTasks());
212215

0 commit comments

Comments
 (0)