|
44 | 44 | import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; |
45 | 45 | import org.apache.spark.network.util.TransportConf; |
46 | 46 |
|
47 | | - |
48 | 47 | /** |
49 | 48 | * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process. |
50 | 49 | * |
@@ -91,26 +90,8 @@ protected void handleMessage( |
91 | 90 | try { |
92 | 91 | OpenBlocks msg = (OpenBlocks) msgObj; |
93 | 92 | checkAuth(client, msg.appId); |
94 | | - |
95 | | - Iterator<ManagedBuffer> iter = new Iterator<ManagedBuffer>() { |
96 | | - private int index = 0; |
97 | | - |
98 | | - @Override |
99 | | - public boolean hasNext() { |
100 | | - return index < msg.blockIds.length; |
101 | | - } |
102 | | - |
103 | | - @Override |
104 | | - public ManagedBuffer next() { |
105 | | - final ManagedBuffer block = blockManager.getBlockData(msg.appId, msg.execId, |
106 | | - msg.blockIds[index]); |
107 | | - index++; |
108 | | - metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); |
109 | | - return block; |
110 | | - } |
111 | | - }; |
112 | | - |
113 | | - long streamId = streamManager.registerStream(client.getClientId(), iter); |
| 93 | + long streamId = streamManager.registerStream(client.getClientId(), |
| 94 | + new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds)); |
114 | 95 | if (logger.isTraceEnabled()) { |
115 | 96 | logger.trace("Registered streamId {} with {} buffers for client {} from host {}", |
116 | 97 | streamId, |
@@ -209,4 +190,51 @@ public Map<String, Metric> getMetrics() { |
209 | 190 | } |
210 | 191 | } |
211 | 192 |
|
| 193 | + private class ManagedBufferIterator implements Iterator<ManagedBuffer> { |
| 194 | + |
| 195 | + private int index = 0; |
| 196 | + private final String appId; |
| 197 | + private final String execId; |
| 198 | + private final int shuffleId; |
| 199 | + // An array containing mapId and reduceId pairs. |
| 200 | + private final int[] mapIdAndReduceIds; |
| 201 | + |
| 202 | + ManagedBufferIterator(String appId, String execId, String[] blockIds) { |
| 203 | + this.appId = appId; |
| 204 | + this.execId = execId; |
| 205 | + String[] blockId0Parts = blockIds[0].split("_"); |
| 206 | + if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) { |
| 207 | + throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]); |
| 208 | + } |
| 209 | + this.shuffleId = Integer.parseInt(blockId0Parts[1]); |
| 210 | + mapIdAndReduceIds = new int[2 * blockIds.length]; |
| 211 | + for (int i = 0; i < blockIds.length; i++) { |
| 212 | + String[] blockIdParts = blockIds[i].split("_"); |
| 213 | + if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) { |
| 214 | + throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]); |
| 215 | + } |
| 216 | + if (Integer.parseInt(blockIdParts[1]) != shuffleId) { |
| 217 | + throw new IllegalArgumentException("Expected shuffleId=" + shuffleId + |
| 218 | + ", got:" + blockIds[i]); |
| 219 | + } |
| 220 | + mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]); |
| 221 | + mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]); |
| 222 | + } |
| 223 | + } |
| 224 | + |
| 225 | + @Override |
| 226 | + public boolean hasNext() { |
| 227 | + return index < mapIdAndReduceIds.length; |
| 228 | + } |
| 229 | + |
| 230 | + @Override |
| 231 | + public ManagedBuffer next() { |
| 232 | + final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId, |
| 233 | + mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]); |
| 234 | + index += 2; |
| 235 | + metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); |
| 236 | + return block; |
| 237 | + } |
| 238 | + } |
| 239 | + |
212 | 240 | } |
0 commit comments