Skip to content

Commit 4c42986

Browse files
aarondavpwendell
authored andcommitted
[SPARK-4242] [Core] Add SASL to external shuffle service
Does three things: (1) Adds SASL to ExternalShuffleClient, (2) puts SecurityManager in BlockManager's constructor, and (3) adds unit test. Author: Aaron Davidson <[email protected]> Closes #3108 from aarondav/sasl-client and squashes the following commits: 48b622d [Aaron Davidson] Screw it, let's just get LimitedInputStream 3543b70 [Aaron Davidson] Back out of pom change due to unknown test issue? b58518a [Aaron Davidson] ByteStreams.limit() not available :( cbe451a [Aaron Davidson] Address comments 2bf2908 [Aaron Davidson] [SPARK-4242] [Core] Add SASL to external shuffle service
1 parent 5b3b6f6 commit 4c42986

File tree

15 files changed

+272
-23
lines changed

15 files changed

+272
-23
lines changed

LICENSE

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ SUCH DAMAGE.
754754

755755

756756
========================================================================
757-
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
757+
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
758758
========================================================================
759759
Copyright (C) 2008 The Android Open Source Project
760760

@@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
771771
limitations under the License.
772772

773773

774+
========================================================================
775+
For LimitedInputStream
776+
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
777+
========================================================================
778+
Copyright (C) 2007 The Guava Authors
779+
780+
Licensed under the Apache License, Version 2.0 (the "License");
781+
you may not use this file except in compliance with the License.
782+
You may obtain a copy of the License at
783+
784+
http://www.apache.org/licenses/LICENSE-2.0
785+
786+
Unless required by applicable law or agreed to in writing, software
787+
distributed under the License is distributed on an "AS IS" BASIS,
788+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
789+
See the License for the specific language governing permissions and
790+
limitations under the License.
791+
792+
774793
========================================================================
775794
BSD-style licenses
776795
========================================================================

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ object SparkEnv extends Logging {
287287

288288
// NB: blockManager is not valid until initialize() is called later.
289289
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
290-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
290+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
291291

292292
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
293293

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ private[spark] class BlockManager(
7272
val conf: SparkConf,
7373
mapOutputTracker: MapOutputTracker,
7474
shuffleManager: ShuffleManager,
75-
blockTransferService: BlockTransferService)
75+
blockTransferService: BlockTransferService,
76+
securityManager: SecurityManager)
7677
extends BlockDataManager with Logging {
7778

7879
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -115,7 +116,8 @@ private[spark] class BlockManager(
115116
// Client to read other executors' shuffle files. This is either an external service, or just the
116117
// standard BlockTranserService to directly connect to other Executors.
117118
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
118-
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf))
119+
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
120+
securityManager.isAuthenticationEnabled())
119121
} else {
120122
blockTransferService
121123
}
@@ -166,9 +168,10 @@ private[spark] class BlockManager(
166168
conf: SparkConf,
167169
mapOutputTracker: MapOutputTracker,
168170
shuffleManager: ShuffleManager,
169-
blockTransferService: BlockTransferService) = {
171+
blockTransferService: BlockTransferService,
172+
securityManager: SecurityManager) = {
170173
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
171-
conf, mapOutputTracker, shuffleManager, blockTransferService)
174+
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
172175
}
173176

174177
/**
@@ -219,7 +222,6 @@ private[spark] class BlockManager(
219222
return
220223
} catch {
221224
case e: Exception if i < MAX_ATTEMPTS =>
222-
val attemptsRemaining =
223225
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}}"
224226
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
225227
Thread.sleep(SLEEP_TIME_SECS * 1000)

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
6262
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
6363
val transfer = new NioBlockTransferService(conf, securityMgr)
6464
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
65-
mapOutputTracker, shuffleManager, transfer)
65+
mapOutputTracker, shuffleManager, transfer, securityMgr)
6666
store.initialize("app-id")
6767
allStores += store
6868
store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
263263
when(failableTransfer.hostName).thenReturn("some-hostname")
264264
when(failableTransfer.port).thenReturn(1000)
265265
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
266-
10000, conf, mapOutputTracker, shuffleManager, failableTransfer)
266+
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
267267
failableStore.initialize("app-id")
268268
allStores += failableStore // so that this gets stopped after test
269269
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
7474
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
7575
val transfer = new NioBlockTransferService(conf, securityMgr)
7676
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
77-
mapOutputTracker, shuffleManager, transfer)
77+
mapOutputTracker, shuffleManager, transfer, securityMgr)
7878
manager.initialize("app-id")
7979
manager
8080
}
@@ -795,7 +795,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
795795
// Use Java serializer so we can create an unserializable error.
796796
val transfer = new NioBlockTransferService(conf, securityMgr)
797797
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
798-
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
798+
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
799799

800800
// The put should fail since a1 is not serializable.
801801
class UnserializableClass

network/common/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<dependency>
5151
<groupId>com.google.guava</groupId>
5252
<artifactId>guava</artifactId>
53+
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
5354
<scope>provided</scope>
5455
</dependency>
5556

network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.netty.channel.DefaultFileRegion;
3131

3232
import org.apache.spark.network.util.JavaUtils;
33+
import org.apache.spark.network.util.LimitedInputStream;
3334

3435
/**
3536
* A {@link ManagedBuffer} backed by a segment in a file.
@@ -101,7 +102,7 @@ public InputStream createInputStream() throws IOException {
101102
try {
102103
is = new FileInputStream(file);
103104
ByteStreams.skipFully(is, offset);
104-
return ByteStreams.limit(is, length);
105+
return new LimitedInputStream(is, length);
105106
} catch (IOException e) {
106107
try {
107108
if (is != null) {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import java.io.FilterInputStream;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
24+
import com.google.common.base.Preconditions;
25+
26+
/**
27+
* Wraps a {@link InputStream}, limiting the number of bytes which can be read.
28+
*
29+
* This code is from Guava's 14.0 source code, because there is no compatible way to
30+
* use this functionality in both a Guava 11 environment and a Guava >14 environment.
31+
*/
32+
public final class LimitedInputStream extends FilterInputStream {
33+
private long left;
34+
private long mark = -1;
35+
36+
public LimitedInputStream(InputStream in, long limit) {
37+
super(in);
38+
Preconditions.checkNotNull(in);
39+
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
40+
left = limit;
41+
}
42+
@Override public int available() throws IOException {
43+
return (int) Math.min(in.available(), left);
44+
}
45+
// it's okay to mark even if mark isn't supported, as reset won't work
46+
@Override public synchronized void mark(int readLimit) {
47+
in.mark(readLimit);
48+
mark = left;
49+
}
50+
@Override public int read() throws IOException {
51+
if (left == 0) {
52+
return -1;
53+
}
54+
int result = in.read();
55+
if (result != -1) {
56+
--left;
57+
}
58+
return result;
59+
}
60+
@Override public int read(byte[] b, int off, int len) throws IOException {
61+
if (left == 0) {
62+
return -1;
63+
}
64+
len = (int) Math.min(len, left);
65+
int result = in.read(b, off, len);
66+
if (result != -1) {
67+
left -= result;
68+
}
69+
return result;
70+
}
71+
@Override public synchronized void reset() throws IOException {
72+
if (!in.markSupported()) {
73+
throw new IOException("Mark not supported");
74+
}
75+
if (mark == -1) {
76+
throw new IOException("Mark not set");
77+
}
78+
in.reset();
79+
left = mark;
80+
}
81+
@Override public long skip(long n) throws IOException {
82+
n = Math.min(n, left);
83+
long skipped = in.skip(n);
84+
left -= skipped;
85+
return skipped;
86+
}
87+
}

network/shuffle/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
<dependency>
5252
<groupId>com.google.guava</groupId>
5353
<artifactId>guava</artifactId>
54+
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
5455
<scope>provided</scope>
5556
</dependency>
5657

network/shuffle/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
126126
logger.trace("SASL client callback: setting realm");
127127
RealmCallback rc = (RealmCallback) callback;
128128
rc.setText(rc.getDefaultText());
129-
logger.info("Realm callback");
130129
} else if (callback instanceof RealmChoiceCallback) {
131130
// ignore (?)
132131
} else {

0 commit comments

Comments
 (0)