Skip to content

Commit effd0ed

Browse files
author
Jacky Lee
committed
2 parents 69bbbd5 + 7014ba1 commit effd0ed

File tree

160 files changed

+4997
-1285
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

160 files changed

+4997
-1285
lines changed

.github/workflows/build_and_test.yml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ jobs:
111111
key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-${{ hashFiles('**/pom.xml') }}
112112
restore-keys: |
113113
${{ matrix.java }}-${{ matrix.hadoop }}-maven-
114-
- name: Cache Ivy local repository
114+
- name: Cache Coursier local repository
115115
uses: actions/cache@v2
116116
with:
117-
path: ~/.ivy2/cache
118-
key: ${{ matrix.java }}-${{ matrix.hadoop }}-ivy-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
117+
path: ~/.cache/coursier
118+
key: ${{ matrix.java }}-${{ matrix.hadoop }}-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
119119
restore-keys: |
120-
${{ matrix.java }}-${{ matrix.hadoop }}-ivy-
120+
${{ matrix.java }}-${{ matrix.hadoop }}-coursier-
121121
- name: Install JDK ${{ matrix.java }}
122122
uses: actions/setup-java@v1
123123
with:
@@ -206,13 +206,13 @@ jobs:
206206
key: pyspark-maven-${{ hashFiles('**/pom.xml') }}
207207
restore-keys: |
208208
pyspark-maven-
209-
- name: Cache Ivy local repository
209+
- name: Cache Coursier local repository
210210
uses: actions/cache@v2
211211
with:
212-
path: ~/.ivy2/cache
213-
key: pyspark-ivy-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
212+
path: ~/.cache/coursier
213+
key: pyspark-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
214214
restore-keys: |
215-
pyspark-ivy-
215+
pyspark-coursier-
216216
- name: Install Python 3.6
217217
uses: actions/setup-python@v2
218218
with:
@@ -282,13 +282,13 @@ jobs:
282282
key: sparkr-maven-${{ hashFiles('**/pom.xml') }}
283283
restore-keys: |
284284
sparkr-maven-
285-
- name: Cache Ivy local repository
285+
- name: Cache Coursier local repository
286286
uses: actions/cache@v2
287287
with:
288-
path: ~/.ivy2/cache
289-
key: sparkr-ivy-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
288+
path: ~/.cache/coursier
289+
key: sparkr-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
290290
restore-keys: |
291-
sparkr-ivy-
291+
sparkr-coursier-
292292
- name: Run tests
293293
run: |
294294
mkdir -p ~/.m2
@@ -404,13 +404,13 @@ jobs:
404404
steps:
405405
- name: Checkout Spark repository
406406
uses: actions/checkout@v2
407-
- name: Cache Ivy local repository
407+
- name: Cache Coursier local repository
408408
uses: actions/cache@v2
409409
with:
410-
path: ~/.ivy2/cache
411-
key: scala-213-ivy-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
410+
path: ~/.cache/coursier
411+
key: scala-213-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
412412
restore-keys: |
413-
scala-213-ivy-
413+
scala-213-coursier-
414414
- name: Install Java 11
415415
uses: actions/setup-java@v1
416416
with:

R/pkg/inst/profile/shell.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,7 @@
4343
cat(" /_/", "\n")
4444
cat("\n")
4545

46-
cat("\nSparkSession available as 'spark'.\n")
46+
cat("\nSparkSession Web UI available at", SparkR::sparkR.uiWebUrl())
47+
cat("\nSparkSession available as 'spark'(master = ", unlist(SparkR::sparkR.conf("spark.master")),
48+
", app id = ", unlist(SparkR::sparkR.conf("spark.app.id")), ").", "\n", sep = "")
4749
}

common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.network.protocol;
1919

2020
import java.io.IOException;
21+
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223

2324
import io.netty.buffer.ByteBuf;
@@ -46,7 +47,11 @@ public static String decode(ByteBuf buf) {
4647
}
4748
}
4849

49-
/** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
50+
/**
51+
* Bitmaps are encoded with their serialization length followed by the serialization bytes.
52+
*
53+
* @since 3.1.0
54+
*/
5055
public static class Bitmaps {
5156
public static int encodedLength(RoaringBitmap b) {
5257
// Compress the bitmap before serializing it. Note that since BlockTransferMessage
@@ -57,13 +62,20 @@ public static int encodedLength(RoaringBitmap b) {
5762
return b.serializedSizeInBytes();
5863
}
5964

65+
/**
66+
* The input ByteBuf for this encoder should have enough write capacity to fit the serialized
67+
* bitmap. Other encoders which use {@link io.netty.buffer.AbstractByteBuf#writeBytes(byte[])}
68+
* to write can expand the buf as writeBytes calls {@link ByteBuf#ensureWritable} internally.
69+
* However, this encoder doesn't rely on netty's writeBytes and will fail if the input buf
70+
* doesn't have enough write capacity.
71+
*/
6072
public static void encode(ByteBuf buf, RoaringBitmap b) {
61-
int encodedLength = b.serializedSizeInBytes();
6273
// RoaringBitmap requires nio ByteBuffer for serde. We expose the netty ByteBuf as a nio
6374
// ByteBuffer. Here, we need to explicitly manage the index so we can write into the
6475
// ByteBuffer, and the write is reflected in the underneath ByteBuf.
65-
b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
66-
buf.writerIndex(buf.writerIndex() + encodedLength);
76+
ByteBuffer byteBuffer = buf.nioBuffer(buf.writerIndex(), buf.writableBytes());
77+
b.serialize(byteBuffer);
78+
buf.writerIndex(buf.writerIndex() + byteBuffer.position());
6779
}
6880

6981
public static RoaringBitmap decode(ByteBuf buf) {
@@ -172,7 +184,11 @@ public static long[] decode(ByteBuf buf) {
172184
}
173185
}
174186

175-
/** Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding. */
187+
/**
188+
* Bitmap arrays are encoded with the number of bitmaps followed by per-Bitmap encoding.
189+
*
190+
* @since 3.1.0
191+
*/
176192
public static class BitmapArrays {
177193
public static int encodedLength(RoaringBitmap[] bitmaps) {
178194
int totalLength = 4;

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,4 +363,39 @@ public boolean useOldFetchProtocol() {
363363
return conf.getBoolean("spark.shuffle.useOldFetchProtocol", false);
364364
}
365365

366+
/**
367+
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
368+
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
369+
* a cluster level because this configuration is set to
370+
* 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
371+
* To turn on push-based shuffle at a cluster level, set the configuration to
372+
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
373+
*/
374+
public String mergedShuffleFileManagerImpl() {
375+
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
376+
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
377+
}
378+
379+
/**
380+
* The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during
381+
* push-based shuffle.
382+
* A merged shuffle file consists of multiple small shuffle blocks. Fetching the
383+
* complete merged shuffle file in a single response increases the memory requirements for the
384+
* clients. Instead of serving the entire merged file, the shuffle service serves the
385+
* merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this
386+
* configuration controls how big a chunk can get. A corresponding index file for each merged
387+
* shuffle file will be generated indicating chunk boundaries.
388+
*/
389+
public int minChunkSizeInMergedShuffleFile() {
390+
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
391+
conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
392+
}
393+
394+
/**
395+
* The size of cache in memory which is used in push-based shuffle for storing merged index files.
396+
*/
397+
public long mergedIndexCacheSize() {
398+
return JavaUtils.byteStringAsBytes(
399+
conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
400+
}
366401
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.protocol;
19+
20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.buffer.Unpooled;
22+
import org.junit.Test;
23+
import org.roaringbitmap.RoaringBitmap;
24+
25+
import static org.junit.Assert.*;
26+
27+
/**
28+
* Tests for {@link Encoders}.
29+
*/
30+
public class EncodersSuite {
31+
32+
@Test
33+
public void testRoaringBitmapEncodeDecode() {
34+
RoaringBitmap bitmap = new RoaringBitmap();
35+
bitmap.add(1, 2, 3);
36+
ByteBuf buf = Unpooled.buffer(Encoders.Bitmaps.encodedLength(bitmap));
37+
Encoders.Bitmaps.encode(buf, bitmap);
38+
RoaringBitmap decodedBitmap = Encoders.Bitmaps.decode(buf);
39+
assertEquals(bitmap, decodedBitmap);
40+
}
41+
42+
@Test (expected = java.nio.BufferOverflowException.class)
43+
public void testRoaringBitmapEncodeShouldFailWhenBufferIsSmall() {
44+
RoaringBitmap bitmap = new RoaringBitmap();
45+
bitmap.add(1, 2, 3);
46+
ByteBuf buf = Unpooled.buffer(4);
47+
Encoders.Bitmaps.encode(buf, bitmap);
48+
}
49+
50+
@Test
51+
public void testBitmapArraysEncodeDecode() {
52+
RoaringBitmap[] bitmaps = new RoaringBitmap[] {
53+
new RoaringBitmap(),
54+
new RoaringBitmap(),
55+
new RoaringBitmap(), // empty
56+
new RoaringBitmap(),
57+
new RoaringBitmap()
58+
};
59+
bitmaps[0].add(1, 2, 3);
60+
bitmaps[1].add(1, 2, 4);
61+
bitmaps[3].add(7L, 9L);
62+
bitmaps[4].add(1L, 100L);
63+
ByteBuf buf = Unpooled.buffer(Encoders.BitmapArrays.encodedLength(bitmaps));
64+
Encoders.BitmapArrays.encode(buf, bitmaps);
65+
RoaringBitmap[] decodedBitmaps = Encoders.BitmapArrays.decode(buf);
66+
assertArrayEquals(bitmaps, decodedBitmaps);
67+
}
68+
}

common/network-shuffle/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
<artifactId>metrics-core</artifactId>
4848
</dependency>
4949

50+
<dependency>
51+
<groupId>org.apache.spark</groupId>
52+
<artifactId>spark-tags_${scala.binary.version}</artifactId>
53+
</dependency>
54+
5055
<!-- Provided dependencies -->
5156
<dependency>
5257
<groupId>org.slf4j</groupId>
@@ -70,11 +75,6 @@
7075
<type>test-jar</type>
7176
<scope>test</scope>
7277
</dependency>
73-
<dependency>
74-
<groupId>org.apache.spark</groupId>
75-
<artifactId>spark-tags_${scala.binary.version}</artifactId>
76-
<scope>test</scope>
77-
</dependency>
7878

7979
<!--
8080
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121

2222
import com.google.common.base.Throwables;
2323

24+
import org.apache.spark.annotation.Evolving;
25+
2426
/**
2527
* Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
2628
* and logged.
2729
* Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
2830
* - remaining retries < max retries
2931
* - exception is an IOException
32+
*
33+
* @since 3.1.0
3034
*/
31-
35+
@Evolving
3236
public interface ErrorHandler {
3337

3438
boolean shouldRetryError(Throwable t);
@@ -44,6 +48,8 @@ default boolean shouldLogError(Throwable t) {
4448

4549
/**
4650
* The error handler for pushing shuffle blocks to remote shuffle services.
51+
*
52+
* @since 3.1.0
4753
*/
4854
class BlockPushErrorHandler implements ErrorHandler {
4955
/**

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile)
6868
throws IOException {
6969
this(new OneForOneStreamManager(),
7070
new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
71-
new NoOpMergedShuffleFileManager());
71+
new NoOpMergedShuffleFileManager(conf));
7272
}
7373

7474
public ExternalBlockHandler(
@@ -89,7 +89,7 @@ public ExternalShuffleBlockResolver getBlockResolver() {
8989
public ExternalBlockHandler(
9090
OneForOneStreamManager streamManager,
9191
ExternalShuffleBlockResolver blockManager) {
92-
this(streamManager, blockManager, new NoOpMergedShuffleFileManager());
92+
this(streamManager, blockManager, new NoOpMergedShuffleFileManager(null));
9393
}
9494

9595
/** Enables mocking out the StreamManager, BlockManager, and MergeManager. */
@@ -175,7 +175,7 @@ protected void handleMessage(
175175
RegisterExecutor msg = (RegisterExecutor) msgObj;
176176
checkAuth(client, msg.appId);
177177
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
178-
mergeManager.registerExecutor(msg.appId, msg.executorInfo.localDirs);
178+
mergeManager.registerExecutor(msg.appId, msg.executorInfo);
179179
callback.onSuccess(ByteBuffer.wrap(new byte[0]));
180180
} finally {
181181
responseDelayContext.stop();
@@ -232,6 +232,7 @@ public StreamManager getStreamManager() {
232232
*/
233233
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
234234
blockManager.applicationRemoved(appId, cleanupLocalDirs);
235+
mergeManager.applicationRemoved(appId, cleanupLocalDirs);
235236
}
236237

237238
/**
@@ -430,8 +431,15 @@ public ManagedBuffer next() {
430431
/**
431432
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
432433
* is not enabled.
434+
*
435+
* @since 3.1.0
433436
*/
434-
private static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
437+
public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
438+
439+
// This constructor is needed because we use this constructor to instantiate an implementation
440+
// of MergedShuffleFileManager using reflection.
441+
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
442+
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
435443

436444
@Override
437445
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
@@ -444,18 +452,13 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
444452
}
445453

446454
@Override
447-
public void registerApplication(String appId, String user) {
448-
// No-op. Do nothing.
449-
}
450-
451-
@Override
452-
public void registerExecutor(String appId, String[] localDirs) {
455+
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
453456
// No-Op. Do nothing.
454457
}
455458

456459
@Override
457460
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
458-
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
461+
// No-Op. Do nothing.
459462
}
460463

461464
@Override

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlockMeta.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
* 1. Number of chunks in a merged shuffle block.
3535
* 2. Bitmaps for each chunk in the merged block. A chunk bitmap contains all the mapIds that were
3636
* merged to that merged block chunk.
37+
*
38+
* @since 3.1.0
3739
*/
3840
public class MergedBlockMeta {
3941
private final int numChunks;

0 commit comments

Comments
 (0)