Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>dist</id>
Expand Down
8 changes: 2 additions & 6 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Preserve the calling directory
_CALLING_DIR="$(pwd)"
# Options used during compilation
_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is ReservedCodeCacheSize no longer applicable to java 8?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still in Java 8. I actually removed this because I think it's defunct and no longer needed, but I admit it's not strictly related to Java 8. I'd put it back if anyone has doubts, but if nobody can recall what it's for (I've never set it in dev or production) maybe it's removable now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was there to speed compilation up. Unless you want to do a controlled experiment to make sure it doesn't regress, I'd just leave it there.


# Installs any application tarball given a URL, the expected tarball name,
# and, optionally, a checkable binary path to determine if the binary has
Expand Down Expand Up @@ -141,13 +141,9 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
ZINC_JAVA_HOME=
if [ -n "$JAVA_7_HOME" ]; then
ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
fi
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
$ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
Expand Down
2 changes: 1 addition & 1 deletion build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ get_mem_opts () {
(( $perm < 4096 )) || perm=4096
local codecache=$(( $perm / 2 ))

echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
}

require_arg () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,40 +131,36 @@ public void setClientId(String id) {
*/
public void fetchChunk(
long streamId,
final int chunkIndex,
final ChunkReceivedCallback callback) {
final long startTime = System.currentTimeMillis();
int chunkIndex,
ChunkReceivedCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}

final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);

channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", streamChunkId,
getRemoteAddress(channel), timeTaken);
}
});
} else {
String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeFetchRequest(streamChunkId);
channel.close();
try {
callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});
}

/**
Expand All @@ -175,8 +169,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
* @param streamId The stream to fetch.
* @param callback Object to call with the stream data.
*/
public void stream(final String streamId, final StreamCallback callback) {
final long startTime = System.currentTimeMillis();
public void stream(String streamId, StreamCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}
Expand All @@ -186,29 +180,25 @@ public void stream(final String streamId, final StreamCallback callback) {
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(callback);
channel.writeAndFlush(new StreamRequest(streamId)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request for {} to {} took {} ms", streamId,
getRemoteAddress(channel), timeTaken);
}
});
} else {
String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
channel.close();
try {
callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});
}
}

Expand All @@ -220,19 +210,17 @@ public void operationComplete(ChannelFuture future) throws Exception {
* @param callback Callback to handle the RPC's reply.
* @return The RPC's id.
*/
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
final long startTime = System.currentTimeMillis();
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);

channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
.addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
Expand All @@ -251,8 +239,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
}
});
});

return requestId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.Key;
import javax.crypto.KeyGenerator;
import javax.crypto.Mac;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -37,7 +32,6 @@
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down Expand Up @@ -103,20 +97,18 @@ public void doBootstrap(TransportClient client, Channel channel) {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {

AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
try {
String secretKey = secretKeyHolder.getSecretKey(authUser);
try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);

ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
conf.authRTTimeoutMs());
ByteBuffer responseData =
client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
ServerResponse response = ServerResponse.decodeMessage(responseData);

engine.validate(response);
engine.sessionCipher().addToChannel(channel);
} finally {
engine.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.network.crypto;

import java.io.IOException;
import java.nio.ByteBuffer;
import javax.security.sasl.Sasl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
Expand All @@ -35,7 +33,6 @@
import org.apache.spark.network.sasl.SaslRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -189,21 +187,16 @@ private void processOneWayMessage(OneWayMessage req) {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
private void respond(final Encodable result) {
final SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
}
private void respond(Encodable result) {
SocketAddress remoteAddress = channel.remoteAddress();
channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.network.crypto;

import java.util.Arrays;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,8 @@ private ShuffleMetrics() {
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
@Override
public Integer getValue() {
return blockManager.getRegisteredExecutorsSize();
}
});
allMetrics.put("registeredExecutorsSize",
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);

// Execute the actual deletion in a different thread, as it may take some time.
directoryCleaner.execute(new Runnable() {
@Override
public void run() {
deleteExecutorDirs(executor.localDirs);
}
});
directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,19 @@ public void init(String appId) {

@Override
public void fetchBlocks(
final String host,
final int port,
final String execId,
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
new RetryingBlockFetcher.BlockFetchStarter() {
@Override
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
throws IOException, InterruptedException {
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
}
};
new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
};

int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
Expand Down Expand Up @@ -131,12 +127,9 @@ public void registerWithShuffleServer(
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
TransportClient client = clientFactory.createUnmanagedClient(host, port);
try {
try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
} finally {
client.close();
}
}

Expand Down
Loading