Skip to content

Commit edcd9fb

Browse files
zsxwingcloud-fan
authored andcommitted
[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-*
## What changes were proposed in this pull request? Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <[email protected]> Closes #18593 from zsxwing/SPARK-21369. (cherry picked from commit 833eab2) Signed-off-by: Wenchen Fan <[email protected]>
1 parent a05edf4 commit edcd9fb

File tree

5 files changed

+19
-23
lines changed

5 files changed

+19
-23
lines changed

common/network-common/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@
9090
<dependency>
9191
<groupId>org.apache.spark</groupId>
9292
<artifactId>spark-tags_${scala.binary.version}</artifactId>
93-
</dependency>
93+
<scope>test</scope>
94+
</dependency>
9495

9596
<!--
9697
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.util.concurrent.ConcurrentLinkedQueue;
2525
import java.util.concurrent.atomic.AtomicLong;
2626

27-
import scala.Tuple2;
28-
2927
import com.google.common.annotations.VisibleForTesting;
3028
import io.netty.channel.Channel;
29+
import org.apache.commons.lang3.tuple.ImmutablePair;
30+
import org.apache.commons.lang3.tuple.Pair;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

@@ -58,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
5858

5959
private final Map<Long, RpcResponseCallback> outstandingRpcs;
6060

61-
private final Queue<Tuple2<String, StreamCallback>> streamCallbacks;
61+
private final Queue<Pair<String, StreamCallback>> streamCallbacks;
6262
private volatile boolean streamActive;
6363

6464
/** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
@@ -92,7 +92,7 @@ public void removeRpcRequest(long requestId) {
9292

9393
public void addStreamCallback(String streamId, StreamCallback callback) {
9494
timeOfLastRequestNs.set(System.nanoTime());
95-
streamCallbacks.offer(new Tuple2<>(streamId, callback));
95+
streamCallbacks.offer(ImmutablePair.of(streamId, callback));
9696
}
9797

9898
@VisibleForTesting
@@ -119,9 +119,9 @@ private void failOutstandingRequests(Throwable cause) {
119119
logger.warn("RpcResponseCallback.onFailure throws exception", e);
120120
}
121121
}
122-
for (Tuple2<String, StreamCallback> entry : streamCallbacks) {
122+
for (Pair<String, StreamCallback> entry : streamCallbacks) {
123123
try {
124-
entry._2().onFailure(entry._1(), cause);
124+
entry.getValue().onFailure(entry.getKey(), cause);
125125
} catch (Exception e) {
126126
logger.warn("StreamCallback.onFailure throws exception", e);
127127
}
@@ -208,9 +208,9 @@ public void handle(ResponseMessage message) throws Exception {
208208
}
209209
} else if (message instanceof StreamResponse) {
210210
StreamResponse resp = (StreamResponse) message;
211-
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
211+
Pair<String, StreamCallback> entry = streamCallbacks.poll();
212212
if (entry != null) {
213-
StreamCallback callback = entry._2();
213+
StreamCallback callback = entry.getValue();
214214
if (resp.byteCount > 0) {
215215
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
216216
callback);
@@ -235,9 +235,9 @@ public void handle(ResponseMessage message) throws Exception {
235235
}
236236
} else if (message instanceof StreamFailure) {
237237
StreamFailure resp = (StreamFailure) message;
238-
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
238+
Pair<String, StreamCallback> entry = streamCallbacks.poll();
239239
if (entry != null) {
240-
StreamCallback callback = entry._2();
240+
StreamCallback callback = entry.getValue();
241241
try {
242242
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
243243
} catch (IOException ioe) {

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.atomic.AtomicLong;
2525

26-
import scala.Tuple2;
27-
2826
import com.google.common.base.Preconditions;
2927
import io.netty.channel.Channel;
3028
import org.slf4j.Logger;
@@ -98,21 +96,16 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {
9896

9997
@Override
10098
public ManagedBuffer openStream(String streamChunkId) {
101-
Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
102-
return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
103-
}
104-
105-
public static String genStreamChunkId(long streamId, int chunkId) {
106-
return String.format("%d_%d", streamId, chunkId);
107-
}
108-
109-
public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
11099
String[] array = streamChunkId.split("_");
111100
assert array.length == 2:
112101
"Stream id and chunk index should be specified when open stream for fetching block.";
113102
long streamId = Long.valueOf(array[0]);
114103
int chunkIndex = Integer.valueOf(array[1]);
115-
return new Tuple2<>(streamId, chunkIndex);
104+
return getChunk(streamId, chunkIndex);
105+
}
106+
107+
public static String genStreamChunkId(long streamId, int chunkId) {
108+
return String.format("%d_%d", streamId, chunkId);
116109
}
117110

118111
@Override

common/network-shuffle/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
<dependency>
7070
<groupId>org.apache.spark</groupId>
7171
<artifactId>spark-tags_${scala.binary.version}</artifactId>
72+
<scope>test</scope>
7273
</dependency>
7374

7475
<!--

common/network-yarn/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
<dependency>
4949
<groupId>org.apache.spark</groupId>
5050
<artifactId>spark-tags_${scala.binary.version}</artifactId>
51+
<scope>test</scope>
5152
</dependency>
5253

5354
<!--

0 commit comments

Comments
 (0)