diff --git a/assembly/pom.xml b/assembly/pom.xml index 53f18796e601..9d8607d9137c 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -187,6 +187,7 @@ org.apache.maven.plugins maven-assembly-plugin + 3.0.0 dist diff --git a/build/mvn b/build/mvn index 866bad892c75..1e393c331dd8 100755 --- a/build/mvn +++ b/build/mvn @@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # Preserve the calling directory _CALLING_DIR="$(pwd)" # Options used during compilation -_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" # Installs any application tarball given a URL, the expected tarball name, # and, optionally, a checkable binary path to determine if the binary has @@ -141,13 +141,9 @@ cd "${_CALLING_DIR}" # Now that zinc is ensured to be installed, check its status and, if its # not running or just installed, start it if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then - ZINC_JAVA_HOME= - if [ -n "$JAVA_7_HOME" ]; then - ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME" - fi export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} - $ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \ + "${ZINC_BIN}" -start -port ${ZINC_PORT} \ -scala-compiler "${SCALA_COMPILER}" \ -scala-library "${SCALA_LIBRARY}" &>/dev/null fi diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 615f84839465..4732669ee651 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -117,7 +117,7 @@ get_mem_opts () { (( $perm < 4096 )) || perm=4096 local codecache=$(( $perm / 2 )) - echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m" + echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m" } require_arg () { diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 7e7d78d42a8f..a6f527c11821 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -32,8 +32,6 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,40 +131,36 @@ public void setClientId(String id) { */ public void fetchChunk( long streamId, - final int chunkIndex, - final ChunkReceivedCallback callback) { - final long startTime = System.currentTimeMillis(); + int chunkIndex, + ChunkReceivedCallback callback) { + long startTime = System.currentTimeMillis(); if (logger.isDebugEnabled()) { logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel)); } - final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); + StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); - channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - long timeTaken = System.currentTimeMillis() - startTime; - if (logger.isTraceEnabled()) { - logger.trace("Sending request {} to {} took {} ms", streamChunkId, - getRemoteAddress(channel), timeTaken); - } - } else { - String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, - getRemoteAddress(channel), future.cause()); - logger.error(errorMsg, future.cause()); - handler.removeFetchRequest(streamChunkId); - channel.close(); - try { - callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause())); - } catch (Exception e) { - logger.error("Uncaught exception in RPC response callback handler!", e); - } - } + channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> { + if (future.isSuccess()) { + long timeTaken = System.currentTimeMillis() - startTime; + if (logger.isTraceEnabled()) { + logger.trace("Sending request {} to {} took {} ms", streamChunkId, + getRemoteAddress(channel), timeTaken); } - }); + } else { + String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, + getRemoteAddress(channel), future.cause()); + logger.error(errorMsg, future.cause()); + handler.removeFetchRequest(streamChunkId); + channel.close(); + try { + callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause())); + } catch (Exception e) { + logger.error("Uncaught exception in RPC response callback handler!", e); + } + } + }); } /** @@ -175,8 +169,8 @@ public void operationComplete(ChannelFuture future) throws Exception { * @param streamId The stream to fetch. * @param callback Object to call with the stream data. */ - public void stream(final String streamId, final StreamCallback callback) { - final long startTime = System.currentTimeMillis(); + public void stream(String streamId, StreamCallback callback) { + long startTime = System.currentTimeMillis(); if (logger.isDebugEnabled()) { logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel)); } @@ -186,29 +180,25 @@ public void stream(final String streamId, final StreamCallback callback) { // when responses arrive. synchronized (this) { handler.addStreamCallback(callback); - channel.writeAndFlush(new StreamRequest(streamId)).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - long timeTaken = System.currentTimeMillis() - startTime; - if (logger.isTraceEnabled()) { - logger.trace("Sending request for {} to {} took {} ms", streamId, - getRemoteAddress(channel), timeTaken); - } - } else { - String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId, - getRemoteAddress(channel), future.cause()); - logger.error(errorMsg, future.cause()); - channel.close(); - try { - callback.onFailure(streamId, new IOException(errorMsg, future.cause())); - } catch (Exception e) { - logger.error("Uncaught exception in RPC response callback handler!", e); - } - } + channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> { + if (future.isSuccess()) { + long timeTaken = System.currentTimeMillis() - startTime; + if (logger.isTraceEnabled()) { + logger.trace("Sending request for {} to {} took {} ms", streamId, + getRemoteAddress(channel), timeTaken); } - }); + } else { + String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId, + getRemoteAddress(channel), future.cause()); + logger.error(errorMsg, future.cause()); + channel.close(); + try { + callback.onFailure(streamId, new IOException(errorMsg, future.cause())); + } catch (Exception e) { + logger.error("Uncaught exception in RPC response callback handler!", e); + } + } + }); } } @@ -220,19 +210,17 @@ public void operationComplete(ChannelFuture future) throws Exception { * @param callback Callback to handle the RPC's reply. * @return The RPC's id. */ - public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) { - final long startTime = System.currentTimeMillis(); + public long sendRpc(ByteBuffer message, RpcResponseCallback callback) { + long startTime = System.currentTimeMillis(); if (logger.isTraceEnabled()) { logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } - final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); handler.addRpcRequest(requestId, callback); - channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { + channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))) + .addListener(future -> { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) { @@ -251,8 +239,7 @@ public void operationComplete(ChannelFuture future) throws Exception { logger.error("Uncaught exception in RPC response callback handler!", e); } } - } - }); + }); return requestId; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java index 980525dbf04e..799f4540aa93 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java @@ -20,12 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; -import java.security.Key; -import javax.crypto.KeyGenerator; -import javax.crypto.Mac; -import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -37,7 +32,6 @@ import org.apache.spark.network.client.TransportClientBootstrap; import org.apache.spark.network.sasl.SaslClientBootstrap; import org.apache.spark.network.sasl.SecretKeyHolder; -import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; /** @@ -103,20 +97,18 @@ public void doBootstrap(TransportClient client, Channel channel) { private void doSparkAuth(TransportClient client, Channel channel) throws GeneralSecurityException, IOException { - AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf); - try { + String secretKey = secretKeyHolder.getSecretKey(authUser); + try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) { ClientChallenge challenge = engine.challenge(); ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength()); challenge.encode(challengeData); - ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(), - conf.authRTTimeoutMs()); + ByteBuffer responseData = + client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs()); ServerResponse response = ServerResponse.decodeMessage(responseData); engine.validate(response); engine.sessionCipher().addToChannel(channel); - } finally { - engine.close(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 991d8ba95f5e..0a5c02994000 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -17,9 +17,7 @@ package org.apache.spark.network.crypto; -import java.io.IOException; import java.nio.ByteBuffer; -import javax.security.sasl.Sasl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -35,7 +33,6 @@ import org.apache.spark.network.sasl.SaslRpcHandler; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; -import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; /** diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 900e8eb25540..8193bc137610 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -22,8 +22,6 @@ import com.google.common.base.Throwables; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -189,21 +187,16 @@ private void processOneWayMessage(OneWayMessage req) { * Responds to a single message with some Encodable object. If a failure occurs while sending, * it will be logged and the channel closed. */ - private void respond(final Encodable result) { - final SocketAddress remoteAddress = channel.remoteAddress(); - channel.writeAndFlush(result).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - logger.trace("Sent result {} to client {}", result, remoteAddress); - } else { - logger.error(String.format("Error sending result %s to %s; closing connection", - result, remoteAddress), future.cause()); - channel.close(); - } - } + private void respond(Encodable result) { + SocketAddress remoteAddress = channel.remoteAddress(); + channel.writeAndFlush(result).addListener(future -> { + if (future.isSuccess()) { + logger.trace("Sent result {} to client {}", result, remoteAddress); + } else { + logger.error(String.format("Error sending result %s to %s; closing connection", + result, remoteAddress), future.cause()); + channel.close(); } - ); + }); } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java index 9a186f211312..a3519fe4a423 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java @@ -18,10 +18,8 @@ package org.apache.spark.network.crypto; import java.util.Arrays; -import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.common.collect.ImmutableMap; import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 6e02430a8edb..6daf9609d76d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -190,12 +190,8 @@ private ShuffleMetrics() { allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); - allMetrics.put("registeredExecutorsSize", new Gauge() { - @Override - public Integer getValue() { - return blockManager.getRegisteredExecutorsSize(); - } - }); + allMetrics.put("registeredExecutorsSize", + (Gauge) () -> blockManager.getRegisteredExecutorsSize()); } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 25e9abde708d..62d58aba4c1e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -205,12 +205,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length); // Execute the actual deletion in a different thread, as it may take some time. - directoryCleaner.execute(new Runnable() { - @Override - public void run() { - deleteExecutorDirs(executor.localDirs); - } - }); + directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs)); } } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 8c0c400966ed..2c5827bf7dc5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -82,23 +82,19 @@ public void init(String appId) { @Override public void fetchBlocks( - final String host, - final int port, - final String execId, + String host, + int port, + String execId, String[] blockIds, BlockFetchingListener listener) { checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = - new RetryingBlockFetcher.BlockFetchStarter() { - @Override - public void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException, InterruptedException { + (blockIds1, listener1) -> { TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start(); - } - }; + new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start(); + }; int maxRetries = conf.maxIORetries(); if (maxRetries > 0) { @@ -131,12 +127,9 @@ public void registerWithShuffleServer( String execId, ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { checkInit(); - TransportClient client = clientFactory.createUnmanagedClient(host, port); - try { + try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) { ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer(); client.sendRpcSync(registerMessage, 5000 /* timeoutMs */); - } finally { - client.close(); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 5be855048e4d..f309dda8afca 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -164,12 +164,9 @@ private synchronized void initiateRetry() { logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms", retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime); - executorService.submit(new Runnable() { - @Override - public void run() { - Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); - fetchAllOutstanding(); - } + executorService.submit(() -> { + Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); + fetchAllOutstanding(); }); } diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index bcd26d4352b2..1356c4723b66 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -61,6 +61,7 @@ net.alchim31.maven scala-maven-plugin + 3.2.2 @@ -71,6 +72,7 @@ org.apache.maven.plugins maven-compiler-plugin + 3.6.1 diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index dc19f4ad5fb9..f03a4da5e715 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -98,6 +98,7 @@ net.alchim31.maven scala-maven-plugin + 3.2.2 @@ -108,6 +109,7 @@ org.apache.maven.plugins maven-compiler-plugin + 3.6.1 diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 671b8c747594..f13c24ae5e01 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -162,14 +162,9 @@ public static ByteBuffer allocateDirectBuffer(int size) { constructor.setAccessible(true); Field cleanerField = cls.getDeclaredField("cleaner"); cleanerField.setAccessible(true); - final long memory = allocateMemory(size); + long memory = allocateMemory(size); ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); - Cleaner cleaner = Cleaner.create(buffer, new Runnable() { - @Override - public void run() { - freeMemory(memory); - } - }); + Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory)); cleanerField.set(buffer, cleaner); return buffer; } catch (Exception e) { diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index fd6e95c3e0a3..621f2c6bf377 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -178,48 +178,52 @@ public static CalendarInterval fromSingleUnitString(String unit, String s) "Interval string does not match day-time format of 'd h:m:s.n': " + s); } else { try { - if (unit.equals("year")) { - int year = (int) toLongWithRange("year", m.group(1), - Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12); - result = new CalendarInterval(year * 12, 0L); - - } else if (unit.equals("month")) { - int month = (int) toLongWithRange("month", m.group(1), - Integer.MIN_VALUE, Integer.MAX_VALUE); - result = new CalendarInterval(month, 0L); - - } else if (unit.equals("week")) { - long week = toLongWithRange("week", m.group(1), - Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK); - result = new CalendarInterval(0, week * MICROS_PER_WEEK); - - } else if (unit.equals("day")) { - long day = toLongWithRange("day", m.group(1), - Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY); - result = new CalendarInterval(0, day * MICROS_PER_DAY); - - } else if (unit.equals("hour")) { - long hour = toLongWithRange("hour", m.group(1), - Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR); - result = new CalendarInterval(0, hour * MICROS_PER_HOUR); - - } else if (unit.equals("minute")) { - long minute = toLongWithRange("minute", m.group(1), - Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE); - result = new CalendarInterval(0, minute * MICROS_PER_MINUTE); - - } else if (unit.equals("second")) { - long micros = parseSecondNano(m.group(1)); - result = new CalendarInterval(0, micros); - - } else if (unit.equals("millisecond")) { - long millisecond = toLongWithRange("millisecond", m.group(1), - Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI); - result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI); - - } else if (unit.equals("microsecond")) { - long micros = Long.parseLong(m.group(1)); - result = new CalendarInterval(0, micros); + switch (unit) { + case "year": + int year = (int) toLongWithRange("year", m.group(1), + Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12); + result = new CalendarInterval(year * 12, 0L); + break; + case "month": + int month = (int) toLongWithRange("month", m.group(1), + Integer.MIN_VALUE, Integer.MAX_VALUE); + result = new CalendarInterval(month, 0L); + break; + case "week": + long week = toLongWithRange("week", m.group(1), + Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK); + result = new CalendarInterval(0, week * MICROS_PER_WEEK); + break; + case "day": + long day = toLongWithRange("day", m.group(1), + Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY); + result = new CalendarInterval(0, day * MICROS_PER_DAY); + break; + case "hour": + long hour = toLongWithRange("hour", m.group(1), + Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR); + result = new CalendarInterval(0, hour * MICROS_PER_HOUR); + break; + case "minute": + long minute = toLongWithRange("minute", m.group(1), + Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE); + result = new CalendarInterval(0, minute * MICROS_PER_MINUTE); + break; + case "second": { + long micros = parseSecondNano(m.group(1)); + result = new CalendarInterval(0, micros); + break; + } + case "millisecond": + long millisecond = toLongWithRange("millisecond", m.group(1), + Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI); + result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI); + break; + case "microsecond": { + long micros = Long.parseLong(m.group(1)); + result = new CalendarInterval(0, micros); + break; + } } } catch (Exception e) { throw new IllegalArgumentException("Error parsing interval string: " + e.getMessage(), e); diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java index ca7babc3f01c..fd0f495ca29d 100644 --- a/core/src/main/java/org/apache/spark/api/java/Optional.java +++ b/core/src/main/java/org/apache/spark/api/java/Optional.java @@ -18,6 +18,7 @@ package org.apache.spark.api.java; import java.io.Serializable; +import java.util.Objects; import com.google.common.base.Preconditions; @@ -52,8 +53,8 @@ *
  • {@link #isPresent()}
  • * * - *

    {@code java.util.Optional} itself is not used at this time because the - * project does not require Java 8. Using {@code com.google.common.base.Optional} + *

    {@code java.util.Optional} itself was not used because at the time, the + * project did not require Java 8. Using {@code com.google.common.base.Optional} * has in the past caused serious library version conflicts with Guava that can't * be resolved by shading. Hence this work-alike clone.

    * @@ -171,7 +172,7 @@ public boolean equals(Object obj) { return false; } Optional other = (Optional) obj; - return value == null ? other.value == null : value.equals(other.value); + return Objects.equals(value, other.value); } @Override diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java index 07aebb75e8f4..33bedf7ebcb0 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java @@ -24,6 +24,7 @@ * A function that returns zero or more output records from each grouping key and its values from 2 * Datasets. */ +@FunctionalInterface public interface CoGroupFunction extends Serializable { Iterator call(K key, Iterator left, Iterator right) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java index 576087b6f428..2f23da5bfec1 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java @@ -23,6 +23,7 @@ /** * A function that returns zero or more records of type Double from each input record. */ +@FunctionalInterface public interface DoubleFlatMapFunction extends Serializable { Iterator call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java index bf16f791f906..3c0291cf4624 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java @@ -22,6 +22,7 @@ /** * A function that returns Doubles, and can be used to construct DoubleRDDs. */ +@FunctionalInterface public interface DoubleFunction extends Serializable { double call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java index 462ca3f6f6d1..a6f69f7cdca8 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java @@ -24,6 +24,7 @@ * * If the function returns true, the element is included in the returned Dataset. */ +@FunctionalInterface public interface FilterFunction extends Serializable { boolean call(T value) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index 2d8ea6d1a5a7..91d61292f167 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -23,6 +23,7 @@ /** * A function that returns zero or more output records from each input record. */ +@FunctionalInterface public interface FlatMapFunction extends Serializable { Iterator call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index fc97b63f825d..f9f2580b01f4 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -23,6 +23,7 @@ /** * A function that takes two inputs and returns zero or more output records. */ +@FunctionalInterface public interface FlatMapFunction2 extends Serializable { Iterator call(T1 t1, T2 t2) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java index bae574ab5755..6423c5d0fce5 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java @@ -23,6 +23,7 @@ /** * A function that returns zero or more output records from each grouping key and its values. */ +@FunctionalInterface public interface FlatMapGroupsFunction extends Serializable { Iterator call(K key, Iterator values) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java index 07e54b28fa12..2e6e90818d58 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java @@ -24,6 +24,7 @@ * * Spark will invoke the call function on each element in the input Dataset. */ +@FunctionalInterface public interface ForeachFunction extends Serializable { void call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java index 4938a51bcd71..d8f55d0ae1dc 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java @@ -23,6 +23,7 @@ /** * Base interface for a function used in Dataset's foreachPartition function. */ +@FunctionalInterface public interface ForeachPartitionFunction extends Serializable { void call(Iterator t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java index b9d9777a7565..8b2bbd501c49 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java @@ -24,6 +24,7 @@ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed * when mapping RDDs of other types. */ +@FunctionalInterface public interface Function extends Serializable { R call(T1 v1) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java index c86928dd0540..5c649d9de414 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function0.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java @@ -22,6 +22,7 @@ /** * A zero-argument function that returns an R. */ +@FunctionalInterface public interface Function0 extends Serializable { R call() throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java index a975ce3c6819..a7d964709515 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java @@ -22,6 +22,7 @@ /** * A two-argument function that takes arguments of type T1 and T2 and returns an R. */ +@FunctionalInterface public interface Function2 extends Serializable { R call(T1 v1, T2 v2) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java index 6eecfb645a66..77acd21d4eff 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java @@ -22,6 +22,7 @@ /** * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. */ +@FunctionalInterface public interface Function3 extends Serializable { R call(T1 v1, T2 v2, T3 v3) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java index 9c35a22ca9d0..d530ba446b3c 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java @@ -22,6 +22,7 @@ /** * A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R. */ +@FunctionalInterface public interface Function4 extends Serializable { R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java index 3ae6ef44898e..5efff943c8cd 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java @@ -22,6 +22,7 @@ /** * Base interface for a map function used in Dataset's map function. */ +@FunctionalInterface public interface MapFunction extends Serializable { U call(T value) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java index faa59eabc8b4..2c3d43afc0b3 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java @@ -23,6 +23,7 @@ /** * Base interface for a map function used in GroupedDataset's mapGroup function. */ +@FunctionalInterface public interface MapGroupsFunction extends Serializable { R call(K key, Iterator values) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java index cf9945a215af..68e8557c88d1 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java @@ -23,6 +23,7 @@ /** * Base interface for function used in Dataset's mapPartitions. */ +@FunctionalInterface public interface MapPartitionsFunction extends Serializable { Iterator call(Iterator input) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java index 51eed2e67b9f..97bd2b37a059 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java @@ -26,6 +26,7 @@ * A function that returns zero or more key-value pair records from each input record. The * key-value pairs are represented as scala.Tuple2 objects. */ +@FunctionalInterface public interface PairFlatMapFunction extends Serializable { Iterator> call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java index 2fdfa7184a3b..34a7e4489a31 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java @@ -25,6 +25,7 @@ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to * construct PairRDDs. */ +@FunctionalInterface public interface PairFunction extends Serializable { Tuple2 call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java index ee092d0058f4..d9029d85387a 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java @@ -22,6 +22,7 @@ /** * Base interface for function used in Dataset's reduce. */ +@FunctionalInterface public interface ReduceFunction extends Serializable { T call(T v1, T v2) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java index f30d42ee5796..aff2bc6e94fb 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java @@ -22,6 +22,7 @@ /** * A function with no return value. */ +@FunctionalInterface public interface VoidFunction extends Serializable { void call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java index da9ae1c9c5cd..ddb616241b24 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java @@ -22,6 +22,7 @@ /** * A two-argument function that takes arguments of type T1 and T2 with no return value. */ +@FunctionalInterface public interface VoidFunction2 extends Serializable { void call(T1 v1, T2 v2) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index dcae4a34c4b0..189d607fa6c5 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -162,14 +162,7 @@ private UnsafeExternalSorter( // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator // does not fully consume the sorter's output (e.g. sort followed by limit). - taskContext.addTaskCompletionListener( - new TaskCompletionListener() { - @Override - public void onTaskCompletion(TaskContext context) { - cleanupResources(); - } - } - ); + taskContext.addTaskCompletionListener(context -> { cleanupResources(); }); } /** diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index 01aed95878cf..cf4dfde86ca9 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -27,22 +27,18 @@ final class UnsafeSorterSpillMerger { private final PriorityQueue priorityQueue; UnsafeSorterSpillMerger( - final RecordComparator recordComparator, - final PrefixComparator prefixComparator, - final int numSpills) { - final Comparator comparator = new Comparator() { - - @Override - public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) { - final int prefixComparisonResult = - prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix()); - if (prefixComparisonResult == 0) { - return recordComparator.compare( - left.getBaseObject(), left.getBaseOffset(), - right.getBaseObject(), right.getBaseOffset()); - } else { - return prefixComparisonResult; - } + RecordComparator recordComparator, + PrefixComparator prefixComparator, + int numSpills) { + Comparator comparator = (left, right) -> { + int prefixComparisonResult = + prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix()); + if (prefixComparisonResult == 0) { + return recordComparator.compare( + left.getBaseObject(), left.getBaseOffset(), + right.getBaseObject(), right.getBaseOffset()); + } else { + return prefixComparisonResult; } }; priorityQueue = new PriorityQueue<>(numSpills, comparator); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cbab7b8844ee..7e564061e69b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging { private def warnDeprecatedVersions(): Unit = { val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3) - if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) { - logWarning("Support for Java 7 is deprecated as of Spark 2.0.0") - } if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) { logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0") } diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala index 31b9c5edf003..3fd812e9fcfe 100644 --- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala +++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala @@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator)) cmd.add(s"-Xmx${memoryMb}M") command.javaOpts.foreach(cmd.add) - CommandBuilderUtils.addPermGenSizeOpt(cmd) addOptionString(cmd, getenv("SPARK_JAVA_OPTS")) cmd } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fe6fe6aa4f01..1e6e9a223e29 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging { def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = { // Politely destroy first process.destroy() - - if (waitForProcess(process, timeoutMs)) { + if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) { // Successful exit Option(process.exitValue()) } else { - // Java 8 added a new API which will more forcibly kill the process. Use that if available. try { - classOf[Process].getMethod("destroyForcibly").invoke(process) + process.destroyForcibly() } catch { - case _: NoSuchMethodException => return None // Not available; give up case NonFatal(e) => logWarning("Exception when attempting to kill process", e) } // Wait, again, although this really should return almost immediately - if (waitForProcess(process, timeoutMs)) { + if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) { Option(process.exitValue()) } else { logWarning("Timed out waiting to forcibly kill process") @@ -1904,45 +1901,12 @@ private[spark] object Utils extends Logging { } } - /** - * Wait for a process to terminate for at most the specified duration. - * - * @return whether the process actually terminated before the given timeout. - */ - def waitForProcess(process: Process, timeoutMs: Long): Boolean = { - try { - // Use Java 8 method if available - classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit]) - .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) - .asInstanceOf[Boolean] - } catch { - case _: NoSuchMethodException => - // Otherwise implement it manually - var terminated = false - val startTime = System.currentTimeMillis - while (!terminated) { - try { - process.exitValue() - terminated = true - } catch { - case e: IllegalThreadStateException => - // Process not terminated yet - if (System.currentTimeMillis - startTime > timeoutMs) { - return false - } - Thread.sleep(100) - } - } - true - } - } - /** * Return the stderr of a process after waiting for the process to terminate. * If the process does not terminate within the specified timeout, return None. */ def getStderr(process: Process, timeoutMs: Long): Option[String] = { - val terminated = Utils.waitForProcess(process, timeoutMs) + val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS) if (terminated) { Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n")) } else { diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java similarity index 99% rename from external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java rename to core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java index fa3a66e73ced..e22ad89c1d6e 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package test.org.apache.spark.java8; +package test.org.apache.spark; import java.io.File; import java.io.Serializable; diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java similarity index 99% rename from core/src/test/java/org/apache/spark/JavaAPISuite.java rename to core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 7bebe0612f9a..80aab100aced 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark; +package test.org.apache.spark; import java.io.*; import java.nio.channels.FileChannel; @@ -34,6 +34,12 @@ import java.util.Set; import java.util.concurrent.*; +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.Partitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 6027310a963e..43f77e68c153 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -919,7 +919,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(pidExists(pid)) val terminated = Utils.terminateProcess(process, 5000) assert(terminated.isDefined) - Utils.waitForProcess(process, 5000) + process.waitFor(5, TimeUnit.SECONDS) val durationMs = System.currentTimeMillis() - startTimeMs assert(durationMs < 5000) assert(!pidExists(pid)) @@ -932,7 +932,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { var majorVersion = versionParts(0).toInt if (majorVersion == 1) majorVersion = versionParts(1).toInt if (majorVersion >= 8) { - // Java8 added a way to forcibly terminate a process. We'll make sure that works by + // We'll make sure that forcibly terminating a process works by // creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On // older versions of java, this will *not* terminate. val file = File.createTempFile("temp-file-name", ".tmp") @@ -953,7 +953,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { val start = System.currentTimeMillis() val terminated = Utils.terminateProcess(process, 5000) assert(terminated.isDefined) - Utils.waitForProcess(process, 5000) + process.waitFor(5, TimeUnit.SECONDS) val duration = System.currentTimeMillis() - start assert(duration < 6000) // add a little extra time to allow a force kill to finish assert(!pidExists(pid)) diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1 index 1350095a945d..1c34f1bbc1aa 100644 --- a/dev/appveyor-install-dependencies.ps1 +++ b/dev/appveyor-install-dependencies.ps1 @@ -90,7 +90,7 @@ Invoke-Expression "7z.exe x maven.zip" # add maven to environment variables $env:Path += ";$tools\apache-maven-$mavenVer\bin" $env:M2_HOME = "$tools\apache-maven-$mavenVer" -$env:MAVEN_OPTS = "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +$env:MAVEN_OPTS = "-Xmx2g -XX:ReservedCodeCacheSize=512m" Pop-Location diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index d616f80c541a..e1db997a7d41 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -267,7 +267,6 @@ if [[ "$1" == "docs" ]]; then echo "Building Spark docs" dest_dir="$REMOTE_PARENT_DIR/${DEST_DIR_NAME}-docs" cd docs - # Compile docs with Java 7 to use nicer format # TODO: Make configurable to add this: PRODUCTION=1 PRODUCTION=1 RELEASE_VERSION="$SPARK_VERSION" jekyll build echo "Copying release documentation to $dest_dir" diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index dc8dfb934419..22cdfd467cd9 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -146,7 +146,7 @@ fi # Build uber fat JAR cd "$SPARK_HOME" -export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m}" +export MAVEN_OPTS="${MAVEN_OPTS:-Xmx2g -XX:ReservedCodeCacheSize=512m}" # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. diff --git a/dev/mima b/dev/mima index 11c4af29808a..eca78ad109b8 100755 --- a/dev/mima +++ b/dev/mima @@ -31,7 +31,6 @@ OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export rm -f .generated-mima* java \ - -XX:MaxPermSize=1g \ -Xmx2g \ -cp "$TOOLS_CLASSPATH:$OLD_DEPS_CLASSPATH" \ org.apache.spark.tools.GenerateMIMAIgnore diff --git a/dev/run-tests.py b/dev/run-tests.py index 0e7f5ffd8d51..04035b33e6a6 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -492,9 +492,6 @@ def main(): java_version = determine_java_version(java_exe) - if java_version.minor < 8: - print("[warn] Java 8 tests will not run because JDK version is < 1.8.") - # install SparkR if which("R"): run_cmd([os.path.join(SPARK_HOME, "R", "install-dev.sh")]) diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index eb43f229c2d4..2906a81f61cd 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -46,7 +46,7 @@ OLD_VERSION=$($MVN -q \ -Dexec.executable="echo" \ -Dexec.args='${project.version}' \ --non-recursive \ - org.codehaus.mojo:exec-maven-plugin:1.3.1:exec) + org.codehaus.mojo:exec-maven-plugin:1.5.0:exec) if [ $? != 0 ]; then echo -e "Error while getting version string from Maven:\n$OLD_VERSION" exit 1 diff --git a/docs/building-spark.md b/docs/building-spark.md index 690c656bad9f..56b892696ee2 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -12,8 +12,8 @@ redirect_from: "building-with-maven.html" ## Apache Maven The Maven-based build is the build of reference for Apache Spark. -Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+. -Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0. +Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+. +Note that support for Java 7 was removed as of Spark 2.2.0. ### Setting up Maven's Memory Usage @@ -21,28 +21,18 @@ You'll need to configure Maven to use more memory than usual by setting `MAVEN_O export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" -When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS. - +(The `ReservedCodeCacheSize` setting is optional but recommended.) If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following: - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... - [ERROR] PermGen space -> [Help 1] - [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes... [ERROR] Java heap space -> [Help 1] - [INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes... - OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. - OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize= - You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before. **Note:** * If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options to the `MAVEN_OPTS` environment variable. -* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`. -* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8 and `build/mvn`. These warnings are harmless. - +* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`. ### build/mvn @@ -224,20 +214,6 @@ To run test suites of a specific sub project as follows: ./build/sbt core/test -## Running Java 8 Test Suites - -Running only Java 8 tests and nothing else. - - ./build/mvn install -DskipTests - ./build/mvn -pl :java8-tests_2.11 test - -or - - ./build/sbt java8-tests/test - -Java 8 tests are automatically enabled when a Java 8 JDK is detected. -If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests. - ## PySpark pip installable If you are building Spark for use in a Python environment and you wish to pip install it, you will first need to build the Spark JARs as described above. Then you can construct an sdist package suitable for setup.py and pip installable package. diff --git a/docs/index.md b/docs/index.md index 023e06ada369..19a9d3bfc601 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,11 +26,13 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} +Spark runs on Java 8+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version ({{site.SCALA_BINARY_VERSION}}.x). -Note that support for Java 7 and Python 2.6 are deprecated as of Spark 2.0.0, and support for +Note that support for Java 7 was removed as of Spark 2.2.0. + +Note that support for Python 2.6 is deprecated as of Spark 2.0.0, and support for Scala 2.10 and versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0, and may be removed in Spark 2.2.0. diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index 3085539b40e6..034e89e25000 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -222,7 +222,7 @@ svmAlg.optimizer() .setNumIterations(200) .setRegParam(0.1) .setUpdater(new L1Updater()); -final SVMModel modelL1 = svmAlg.run(training.rdd()); +SVMModel modelL1 = svmAlg.run(training.rdd()); {% endhighlight %} In order to run the above application, follow the instructions diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md index 430c0690457e..c29400af8505 100644 --- a/docs/mllib-statistics.md +++ b/docs/mllib-statistics.md @@ -317,12 +317,7 @@ JavaSparkContext jsc = ... // standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions. JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10); // Apply a transform to get a random double RDD following `N(1, 4)`. -JavaDoubleRDD v = u.map( - new Function() { - public Double call(Double x) { - return 1.0 + 2.0 * x; - } - }); +JavaDoubleRDD v = u.mapToDouble(x -> 1.0 + 2.0 * x); {% endhighlight %} diff --git a/docs/programming-guide.md b/docs/programming-guide.md index db8b048fcef9..6740dbe0014b 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -54,12 +54,12 @@ import org.apache.spark.SparkConf
    -Spark {{site.SPARK_VERSION}} works with Java 7 and higher. If you are using Java 8, Spark supports +Spark {{site.SPARK_VERSION}} supports [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html) for concisely writing functions, otherwise you can use the classes in the [org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package. -Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0. +Note that support for Java 7 was removed in Spark 2.2.0. To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at: @@ -295,11 +295,6 @@ JavaRDD distData = sc.parallelize(data); Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -> a + b)` to add up the elements of the list. We describe operations on distributed datasets later on. -**Note:** *In this guide, we'll often use the concise Java 8 lambda syntax to specify Java functions, but -in older versions of Java you can implement the interfaces in the -[org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package. -We describe [passing functions to Spark](#passing-functions-to-spark) in more detail below.* -
    @@ -658,7 +653,7 @@ There are two ways to create such functions: * Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark. -* In Java 8, use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html) +* Use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html) to concisely define an implementation. While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs diff --git a/docs/quick-start.md b/docs/quick-start.md index 0836c602feaf..04ac27876252 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -320,13 +320,8 @@ public class SimpleApp { JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD logData = sc.textFile(logFile).cache(); - long numAs = logData.filter(new Function() { - public Boolean call(String s) { return s.contains("a"); } - }).count(); - - long numBs = logData.filter(new Function() { - public Boolean call(String s) { return s.contains("b"); } - }).count(); + long numAs = logData.filter(s -> s.contains("a")).count(); + long numBs = logData.filter(s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 117996db9d09..d4ddcb16bdd0 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -113,15 +113,13 @@ public class JavaCustomReceiver extends Receiver { port = port_; } + @Override public void onStart() { // Start the thread that receives data over a connection - new Thread() { - @Override public void run() { - receive(); - } - }.start(); + new Thread(this::receive).start(); } + @Override public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false @@ -189,7 +187,7 @@ The full source code is in the example [CustomReceiver.scala]({{site.SPARK_GITHU {% highlight java %} // Assuming ssc is the JavaStreamingContext JavaDStream customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port)); -JavaDStream words = lines.flatMap(new FlatMapFunction() { ... }); +JavaDStream words = lines.flatMap(s -> ...); ... {% endhighlight %} diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 6ef54ac21070..e3837013168d 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -68,20 +68,14 @@ kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("topicA", "topicB"); -final JavaInputDStream> stream = +JavaInputDStream> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams) ); -stream.mapToPair( - new PairFunction, String, String>() { - @Override - public Tuple2 call(ConsumerRecord record) { - return new Tuple2<>(record.key(), record.value()); - } - }) +stream.mapToPair(record -> new Tuple2<>(record.key(), record.value())); {% endhighlight %}
    @@ -162,19 +156,13 @@ stream.foreachRDD { rdd =>
    {% highlight java %} -stream.foreachRDD(new VoidFunction>>() { - @Override - public void call(JavaRDD> rdd) { - final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - rdd.foreachPartition(new VoidFunction>>() { - @Override - public void call(Iterator> consumerRecords) { - OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); - } - }); - } +stream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + rdd.foreachPartition(consumerRecords -> { + OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); + }); }); {% endhighlight %}
    @@ -205,14 +193,11 @@ As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if calle
    {% highlight java %} -stream.foreachRDD(new VoidFunction>>() { - @Override - public void call(JavaRDD> rdd) { - OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); +stream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - // some time later, after outputs have completed - ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); - } + // some time later, after outputs have completed + ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); }); {% endhighlight %}
    @@ -268,21 +253,18 @@ JavaInputDStream> stream = KafkaUtils.createDirec ConsumerStrategies.Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) ); -stream.foreachRDD(new VoidFunction>>() { - @Override - public void call(JavaRDD> rdd) { - OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - - Object results = yourCalculation(rdd); +stream.foreachRDD(rdd -> { + OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + + Object results = yourCalculation(rdd); - // begin your transaction + // begin your transaction - // update results - // update offsets where the end of existing offsets matches the beginning of this batch of offsets - // assert that offsets were updated correctly + // update results + // update offsets where the end of existing offsets matches the beginning of this batch of offsets + // assert that offsets were updated correctly - // end your transaction - } + // end your transaction }); {% endhighlight %} diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md index 58b17aa4ce88..24a3e4cdbbd7 100644 --- a/docs/streaming-kafka-0-8-integration.md +++ b/docs/streaming-kafka-0-8-integration.md @@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application.
    // Hold a reference to the current offset ranges, so it can be used downstream - final AtomicReference offsetRanges = new AtomicReference<>(); - - directKafkaStream.transformToPair( - new Function, JavaPairRDD>() { - @Override - public JavaPairRDD call(JavaPairRDD rdd) throws Exception { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - return rdd; - } - } - ).map( + AtomicReference offsetRanges = new AtomicReference<>(); + + directKafkaStream.transformToPair(rdd -> { + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); + return rdd; + }).map( ... - ).foreachRDD( - new Function, Void>() { - @Override - public Void call(JavaPairRDD rdd) throws IOException { - for (OffsetRange o : offsetRanges.get()) { - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() - ); - } - ... - return null; - } - } - ); + ).foreachRDD(rdd -> { + for (OffsetRange o : offsetRanges.get()) { + System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() + ); + } + ... + });
    offsetRanges = [] diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index a878971608b3..abd4ac965360 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -163,12 +163,7 @@ space into words. {% highlight java %} // Split each line into words -JavaDStream words = lines.flatMap( - new FlatMapFunction() { - @Override public Iterator call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }); +JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); {% endhighlight %} `flatMap` is a DStream operation that creates a new DStream by @@ -183,18 +178,8 @@ Next, we want to count these words. {% highlight java %} // Count each word in each batch -JavaPairDStream pairs = words.mapToPair( - new PairFunction() { - @Override public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }); -JavaPairDStream wordCounts = pairs.reduceByKey( - new Function2() { - @Override public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); +JavaPairDStream pairs = words.mapToPair(s -> new Tuple2<>(s, 1)); +JavaPairDStream wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); @@ -836,11 +821,9 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count. {% highlight java %} Function2, Optional, Optional> updateFunction = - new Function2, Optional, Optional>() { - @Override public Optional call(List values, Optional state) { - Integer newSum = ... // add the new values with the previous running count to get the new count - return Optional.of(newSum); - } + (values, state) -> { + Integer newSum = ... // add the new values with the previous running count to get the new count + return Optional.of(newSum); }; {% endhighlight %} @@ -915,15 +898,12 @@ val cleanedDStream = wordCounts.transform { rdd => {% highlight java %} import org.apache.spark.streaming.api.java.*; // RDD containing spam information -final JavaPairRDD spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); +JavaPairRDD spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); -JavaPairDStream cleanedDStream = wordCounts.transform( - new Function, JavaPairRDD>() { - @Override public JavaPairRDD call(JavaPairRDD rdd) throws Exception { - rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning - ... - } - }); +JavaPairDStream cleanedDStream = wordCounts.transform(rdd -> { + rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning + ... +}); {% endhighlight %}
    @@ -986,15 +966,8 @@ val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Se
    {% highlight java %} -// Reduce function adding two integers, defined separately for clarity -Function2 reduceFunc = new Function2() { - @Override public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } -}; - // Reduce last 30 seconds of data, every 10 seconds -JavaPairDStream windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10)); +JavaPairDStream windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10)); {% endhighlight %}
    @@ -1141,14 +1114,7 @@ val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) } {% highlight java %} JavaPairRDD dataset = ... JavaPairDStream windowedStream = stream.window(Durations.seconds(20)); -JavaPairDStream joinedStream = windowedStream.transform( - new Function>, JavaRDD>>() { - @Override - public JavaRDD> call(JavaRDD> rdd) { - return rdd.join(dataset); - } - } -); +JavaPairDStream joinedStream = windowedStream.transform(rdd -> rdd.join(dataset)); {% endhighlight %}
    @@ -1248,17 +1214,11 @@ dstream.foreachRDD { rdd =>
    {% highlight java %} -dstream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - final Connection connection = createNewConnection(); // executed at the driver - rdd.foreach(new VoidFunction() { - @Override - public void call(String record) { - connection.send(record); // executed at the worker - } - }); - } +dstream.foreachRDD(rdd -> { + Connection connection = createNewConnection(); // executed at the driver + rdd.foreach(record -> { + connection.send(record); // executed at the worker + }); }); {% endhighlight %}
    @@ -1297,18 +1257,12 @@ dstream.foreachRDD { rdd =>
    {% highlight java %} -dstream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - rdd.foreach(new VoidFunction() { - @Override - public void call(String record) { - Connection connection = createNewConnection(); - connection.send(record); - connection.close(); - } - }); - } +dstream.foreachRDD(rdd -> { + rdd.foreach(record -> { + Connection connection = createNewConnection(); + connection.send(record); + connection.close(); + }); }); {% endhighlight %}
    @@ -1344,20 +1298,14 @@ dstream.foreachRDD { rdd =>
    {% highlight java %} -dstream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - rdd.foreachPartition(new VoidFunction>() { - @Override - public void call(Iterator partitionOfRecords) { - Connection connection = createNewConnection(); - while (partitionOfRecords.hasNext()) { - connection.send(partitionOfRecords.next()); - } - connection.close(); - } - }); - } +dstream.foreachRDD(rdd -> { + rdd.foreachPartition(partitionOfRecords -> { + Connection connection = createNewConnection(); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + connection.close(); + }); }); {% endhighlight %}
    @@ -1396,21 +1344,15 @@ dstream.foreachRDD { rdd =>
    {% highlight java %} -dstream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - rdd.foreachPartition(new VoidFunction>() { - @Override - public void call(Iterator partitionOfRecords) { - // ConnectionPool is a static, lazily initialized pool of connections - Connection connection = ConnectionPool.getConnection(); - while (partitionOfRecords.hasNext()) { - connection.send(partitionOfRecords.next()); - } - ConnectionPool.returnConnection(connection); // return to the pool for future reuse - } - }); - } +dstream.foreachRDD(rdd -> { + rdd.foreachPartition(partitionOfRecords -> { + // ConnectionPool is a static, lazily initialized pool of connections + Connection connection = ConnectionPool.getConnection(); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + ConnectionPool.returnConnection(connection); // return to the pool for future reuse + }); }); {% endhighlight %}
    @@ -1495,35 +1437,26 @@ public class JavaRow implements java.io.Serializable { JavaDStream words = ... -words.foreachRDD( - new Function2, Time, Void>() { - @Override - public Void call(JavaRDD rdd, Time time) { - - // Get the singleton instance of SparkSession - SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate(); +words.foreachRDD((rdd, time) -> { + // Get the singleton instance of SparkSession + SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate(); - // Convert RDD[String] to RDD[case class] to DataFrame - JavaRDD rowRDD = rdd.map(new Function() { - public JavaRow call(String word) { - JavaRow record = new JavaRow(); - record.setWord(word); - return record; - } - }); - DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class); + // Convert RDD[String] to RDD[case class] to DataFrame + JavaRDD rowRDD = rdd.map(word -> { + JavaRow record = new JavaRow(); + record.setWord(word); + return record; + }); + DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class); - // Creates a temporary view using the DataFrame - wordsDataFrame.createOrReplaceTempView("words"); + // Creates a temporary view using the DataFrame + wordsDataFrame.createOrReplaceTempView("words"); - // Do word count on table using SQL and print it - DataFrame wordCountsDataFrame = - spark.sql("select word, count(*) as total from words group by word"); - wordCountsDataFrame.show(); - return null; - } - } -); + // Do word count on table using SQL and print it + DataFrame wordCountsDataFrame = + spark.sql("select word, count(*) as total from words group by word"); + wordCountsDataFrame.show(); +}); {% endhighlight %} See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java). @@ -1883,27 +1816,21 @@ class JavaDroppedWordsCounter { } } -wordCounts.foreachRDD(new Function2, Time, Void>() { - @Override - public Void call(JavaPairRDD rdd, Time time) throws IOException { - // Get or register the blacklist Broadcast - final Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); - // Get or register the droppedWordsCounter Accumulator - final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); - // Use blacklist to drop words and use droppedWordsCounter to count them - String counts = rdd.filter(new Function, Boolean>() { - @Override - public Boolean call(Tuple2 wordCount) throws Exception { - if (blacklist.value().contains(wordCount._1())) { - droppedWordsCounter.add(wordCount._2()); - return false; - } else { - return true; - } - } - }).collect().toString(); - String output = "Counts at time " + time + " " + counts; - } +wordCounts.foreachRDD((rdd, time) -> { + // Get or register the blacklist Broadcast + Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(wordCount -> { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; + } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; } {% endhighlight %} diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b816072cb8c8..ad3b2fb26dd6 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -103,13 +103,7 @@ Dataset lines = spark // Split the lines into words Dataset words = lines .as(Encoders.STRING()) - .flatMap( - new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()); + .flatMap((FlatMapFunction) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()); // Generate running word count Dataset wordCounts = words.groupBy("value").count(); @@ -517,7 +511,7 @@ val csvDF = spark SparkSession spark = ... // Read text from socket -Dataset[Row] socketDF = spark +Dataset socketDF = spark .readStream() .format("socket") .option("host", "localhost") @@ -530,7 +524,7 @@ socketDF.printSchema(); // Read all the csv files written atomically in a directory StructType userSchema = new StructType().add("name", "string").add("age", "integer"); -Dataset[Row] csvDF = spark +Dataset csvDF = spark .readStream() .option("sep", ";") .schema(userSchema) // Specify schema of the csv files @@ -625,33 +619,15 @@ Dataset ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // // Select the devices which have signal more than 10 df.select("device").where("signal > 10"); // using untyped APIs -ds.filter(new FilterFunction() { // using typed APIs - @Override - public boolean call(DeviceData value) throws Exception { - return value.getSignal() > 10; - } -}).map(new MapFunction() { - @Override - public String call(DeviceData value) throws Exception { - return value.getDevice(); - } -}, Encoders.STRING()); +ds.filter((FilterFunction) value -> value.getSignal() > 10) + .map((MapFunction) value -> value.getDevice(), Encoders.STRING()); // Running count of the number of updates for each device type df.groupBy("deviceType").count(); // using untyped API // Running average signal for each device type -ds.groupByKey(new MapFunction() { // using typed API - @Override - public String call(DeviceData value) throws Exception { - return value.getDeviceType(); - } -}, Encoders.STRING()).agg(typed.avg(new MapFunction() { - @Override - public Double call(DeviceData value) throws Exception { - return value.getSignal(); - } -})); +ds.groupByKey((MapFunction) value -> value.getDeviceType(), Encoders.STRING()) + .agg(typed.avg((MapFunction) value -> value.getSignal())); {% endhighlight %} diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java index f42fd3317b79..004e9b12f626 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java @@ -69,9 +69,9 @@ public static void main(String[] args) { .setOutputCol("words") .setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false); - spark.udf().register("countTokens", new UDF1() { + spark.udf().register("countTokens", new UDF1, Integer>() { @Override - public Integer call(WrappedArray words) { + public Integer call(WrappedArray words) { return words.size(); } }, DataTypes.IntegerType); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 1860594e8e54..b687fae5a1da 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -224,7 +224,7 @@ private static void runJsonDatasetExample(SparkSession spark) { "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD anotherPeopleRDD = new JavaSparkContext(spark.sparkContext()).parallelize(jsonData); - Dataset anotherPeople = spark.read().json(anotherPeopleRDD); + Dataset anotherPeople = spark.read().json(anotherPeopleRDD); anotherPeople.show(); // +---------------+----+ // | address|name| diff --git a/external/java8-tests/README.md b/external/java8-tests/README.md deleted file mode 100644 index aa87901695c2..000000000000 --- a/external/java8-tests/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# Java 8 Test Suites - -These tests require having Java 8 installed and are isolated from the main Spark build. -If Java 8 is not your system's default Java version, you will need to point Spark's build -to your Java location. The set-up depends a bit on the build system: - -* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass - `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically - include the Java 8 test project. - - `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean java8-tests/test - -* For Maven users, - - Maven users can also refer to their Java 8 directory using JAVA_HOME. - - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests` - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn -pl :java8-tests_2.11 test` - - Note that the above command can only be run from project root directory since this module - depends on core and the test-jars of core and streaming. This means an install step is - required to make the test dependencies visible to the Java 8 sub-project. diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml deleted file mode 100644 index 8fc46d7af219..000000000000 --- a/external/java8-tests/pom.xml +++ /dev/null @@ -1,132 +0,0 @@ - - - - 4.0.0 - - org.apache.spark - spark-parent_2.11 - 2.2.0-SNAPSHOT - ../../pom.xml - - - java8-tests_2.11 - pom - Spark Project Java 8 Tests - - - java8-tests - - - - - org.apache.spark - spark-core_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-core_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-sql_${scala.binary.version} - ${project.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - ${project.version} - test-jar - test - - - org.apache.spark - spark-tags_${scala.binary.version} - - - - - org.apache.spark - spark-tags_${scala.binary.version} - test-jar - test - - - - - - - - org.apache.maven.plugins - maven-deploy-plugin - - true - - - - org.apache.maven.plugins - maven-install-plugin - - true - - - - org.apache.maven.plugins - maven-compiler-plugin - - true - 1.8 - 1.8 - 1.8 - - - - net.alchim31.maven - scala-maven-plugin - - ${useZincForJdk8} - - -source - 1.8 - -target - 1.8 - -Xlint:all,-serial,-path - - - - - - diff --git a/external/java8-tests/src/test/resources/log4j.properties b/external/java8-tests/src/test/resources/log4j.properties deleted file mode 100644 index 3706a6e36130..000000000000 --- a/external/java8-tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark_project.jetty=WARN diff --git a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala b/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala deleted file mode 100644 index c4042e47e84e..000000000000 --- a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.java8 - -import org.apache.spark.SharedSparkContext -import org.apache.spark.SparkFunSuite - -/** - * Test cases where JDK8-compiled Scala user code is used with Spark. - */ -class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext { - test("basic RDD closure test (SPARK-6152)") { - sc.parallelize(1 to 1000).map(x => x * x).count() - } -} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 02b23111af78..9c5dceca2d17 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -259,7 +259,7 @@ private[kafka010] class KafkaSource( val preferredLoc = if (numExecutors > 0) { // This allows cached KafkaConsumers in the executors to be re-used to read the same // partition in every batch. - Some(sortedExecutors(floorMod(tp.hashCode, numExecutors))) + Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors))) } else None KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc) }.filter { range => @@ -347,5 +347,4 @@ private[kafka010] object KafkaSource { if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host } } - def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index bf8adbe42f3f..4c6e2ce87e29 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -145,11 +145,6 @@ private[spark] class KafkaRDD[K, V]( a.host > b.host } - /** - * Non-negative modulus, from java 8 math - */ - private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b - override def getPreferredLocations(thePart: Partition): Seq[String] = { // The intention is best-effort consistent executor for a given topicpartition, // so that caching consumers can be effective. @@ -164,7 +159,7 @@ private[spark] class KafkaRDD[K, V]( Seq() } else { // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index - val index = this.floorMod(tp.hashCode, execs.length) + val index = Math.floorMod(tp.hashCode, execs.length) val chosen = execs(index) Seq(chosen.toString) } diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 0622fef17c8d..bc8d6037a367 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -104,15 +104,12 @@ List buildJavaCommand(String extraClassPath) throws IOException { // Load extra JAVA_OPTS from conf/java-opts, if it exists. File javaOpts = new File(join(File.separator, getConfDir(), "java-opts")); if (javaOpts.isFile()) { - BufferedReader br = new BufferedReader(new InputStreamReader( - new FileInputStream(javaOpts), StandardCharsets.UTF_8)); - try { + try (BufferedReader br = new BufferedReader(new InputStreamReader( + new FileInputStream(javaOpts), StandardCharsets.UTF_8))) { String line; while ((line = br.readLine()) != null) { addOptionString(cmd, line); } - } finally { - br.close(); } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index c0779e1c4e9a..12bf29d3b1aa 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -18,7 +18,6 @@ package org.apache.spark.launcher; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; @@ -103,14 +102,7 @@ public synchronized void kill() { try { childProc.exitValue(); } catch (IllegalThreadStateException e) { - // Child is still alive. Try to use Java 8's "destroyForcibly()" if available, - // fall back to the old API if it's not there. - try { - Method destroy = childProc.getClass().getMethod("destroyForcibly"); - destroy.invoke(childProc); - } catch (Exception inner) { - childProc.destroy(); - } + childProc.destroyForcibly(); } finally { childProc = null; } diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java index 250b2a882feb..e14c8aa47d5f 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -312,27 +312,6 @@ static String quoteForCommandString(String s) { return quoted.append('"').toString(); } - /** - * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't - * set it. - */ - static void addPermGenSizeOpt(List cmd) { - // Don't set MaxPermSize for IBM Java, or Oracle Java 8 and later. - if (getJavaVendor() == JavaVendor.IBM) { - return; - } - if (javaMajorVersion(System.getProperty("java.version")) > 7) { - return; - } - for (String arg : cmd) { - if (arg.contains("-XX:MaxPermSize=")) { - return; - } - } - - cmd.add("-XX:MaxPermSize=256m"); - } - /** * Get the major version of the java version string supplied. This method * accepts any JEP-223-compliant strings (9-ea, 9+100), as well as legacy diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index ae43f563e8b4..865d4926da6a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -137,12 +137,7 @@ private LauncherServer() throws IOException { this.server = server; this.running = true; - this.serverThread = factory.newThread(new Runnable() { - @Override - public void run() { - acceptConnections(); - } - }); + this.serverThread = factory.newThread(this::acceptConnections); serverThread.start(); } catch (IOException ioe) { close(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index c7959aee9f88..ff8045390c15 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -44,12 +44,7 @@ class OutputRedirector { OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this.active = true; this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - this.thread = tf.newThread(new Runnable() { - @Override - public void run() { - redirect(); - } - }); + this.thread = tf.newThread(this::redirect); this.sink = Logger.getLogger(loggerName); thread.start(); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index 0aa7bd197d16..cefb4d1a95fb 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -91,9 +91,6 @@ public boolean isFinal() { * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send * a {@link #stop()} message to the application, so it's recommended that users first try to * stop the application cleanly and only resort to this method if that fails. - *

    - * Note that if the application is running as a child process, this method fail to kill the - * process when using Java 7. This may happen if, for example, the application is deadlocked. */ void kill(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 82b593a3f797..81786841de22 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -49,35 +49,44 @@ public List buildCommand(Map env) // Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. - if (className.equals("org.apache.spark.deploy.master.Master")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_MASTER_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.worker.Worker")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_WORKER_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_HISTORY_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) { - javaOptsKeys.add("SPARK_JAVA_OPTS"); - javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; - } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) { - javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") || - className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else { - javaOptsKeys.add("SPARK_JAVA_OPTS"); - memKey = "SPARK_DRIVER_MEMORY"; + switch (className) { + case "org.apache.spark.deploy.master.Master": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_MASTER_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + case "org.apache.spark.deploy.worker.Worker": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_WORKER_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + case "org.apache.spark.deploy.history.HistoryServer": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_HISTORY_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + case "org.apache.spark.executor.CoarseGrainedExecutorBackend": + javaOptsKeys.add("SPARK_JAVA_OPTS"); + javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); + memKey = "SPARK_EXECUTOR_MEMORY"; + break; + case "org.apache.spark.executor.MesosExecutorBackend": + javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); + memKey = "SPARK_EXECUTOR_MEMORY"; + break; + case "org.apache.spark.deploy.mesos.MesosClusterDispatcher": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + break; + case "org.apache.spark.deploy.ExternalShuffleService": + case "org.apache.spark.deploy.mesos.MesosExternalShuffleService": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + default: + javaOptsKeys.add("SPARK_JAVA_OPTS"); + memKey = "SPARK_DRIVER_MEMORY"; + break; } List cmd = buildJavaCommand(extraClassPath); @@ -94,7 +103,6 @@ public List buildCommand(Map env) String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); cmd.add("-Xmx" + mem); - addPermGenSizeOpt(cmd); cmd.add(className); cmd.addAll(classArgs); return cmd; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 29c6d82cdbf1..5e64fa7ed152 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -271,7 +271,6 @@ private List buildSparkSubmitCommand(Map env) config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); } - addPermGenSizeOpt(cmd); cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); return cmd; @@ -405,49 +404,65 @@ private class OptionParser extends SparkSubmitOptionParser { @Override protected boolean handle(String opt, String value) { - if (opt.equals(MASTER)) { - master = value; - } else if (opt.equals(DEPLOY_MODE)) { - deployMode = value; - } else if (opt.equals(PROPERTIES_FILE)) { - propertiesFile = value; - } else if (opt.equals(DRIVER_MEMORY)) { - conf.put(SparkLauncher.DRIVER_MEMORY, value); - } else if (opt.equals(DRIVER_JAVA_OPTIONS)) { - conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); - } else if (opt.equals(DRIVER_LIBRARY_PATH)) { - conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); - } else if (opt.equals(DRIVER_CLASS_PATH)) { - conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); - } else if (opt.equals(CONF)) { - String[] setConf = value.split("=", 2); - checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); - conf.put(setConf[0], setConf[1]); - } else if (opt.equals(CLASS)) { - // The special classes require some special command line handling, since they allow - // mixing spark-submit arguments with arguments that should be propagated to the shell - // itself. Note that for this to work, the "--class" argument must come before any - // non-spark-submit arguments. - mainClass = value; - if (specialClasses.containsKey(value)) { - allowsMixedArguments = true; - appResource = specialClasses.get(value); - } - } else if (opt.equals(KILL_SUBMISSION) || opt.equals(STATUS)) { - isAppResourceReq = false; - sparkArgs.add(opt); - sparkArgs.add(value); - } else if (opt.equals(HELP) || opt.equals(USAGE_ERROR)) { - isAppResourceReq = false; - sparkArgs.add(opt); - } else if (opt.equals(VERSION)) { - isAppResourceReq = false; - sparkArgs.add(opt); - } else { - sparkArgs.add(opt); - if (value != null) { + switch (opt) { + case MASTER: + master = value; + break; + case DEPLOY_MODE: + deployMode = value; + break; + case PROPERTIES_FILE: + propertiesFile = value; + break; + case DRIVER_MEMORY: + conf.put(SparkLauncher.DRIVER_MEMORY, value); + break; + case DRIVER_JAVA_OPTIONS: + conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); + break; + case DRIVER_LIBRARY_PATH: + conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); + break; + case DRIVER_CLASS_PATH: + conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); + break; + case CONF: + String[] setConf = value.split("=", 2); + checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); + conf.put(setConf[0], setConf[1]); + break; + case CLASS: + // The special classes require some special command line handling, since they allow + // mixing spark-submit arguments with arguments that should be propagated to the shell + // itself. Note that for this to work, the "--class" argument must come before any + // non-spark-submit arguments. + mainClass = value; + if (specialClasses.containsKey(value)) { + allowsMixedArguments = true; + appResource = specialClasses.get(value); + } + break; + case KILL_SUBMISSION: + case STATUS: + isAppResourceReq = false; + sparkArgs.add(opt); sparkArgs.add(value); - } + break; + case HELP: + case USAGE_ERROR: + isAppResourceReq = false; + sparkArgs.add(opt); + break; + case VERSION: + isAppResourceReq = false; + sparkArgs.add(opt); + break; + default: + sparkArgs.add(opt); + if (value != null) { + sparkArgs.add(value); + } + break; } return true; } diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java index caeeea5ec6dd..9795041233b6 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java @@ -99,42 +99,6 @@ public void testJavaMajorVersion() { assertEquals(10, javaMajorVersion("10")); } - @Test - public void testAddPermGenSizeOpt() { - List cmd = new ArrayList<>(); - - if (javaMajorVersion(System.getProperty("java.version")) > 7) { - // Does nothing in Java 8 - addPermGenSizeOpt(cmd); - assertEquals(0, cmd.size()); - cmd.clear(); - - } else { - addPermGenSizeOpt(cmd); - assertEquals(1, cmd.size()); - assertTrue(cmd.get(0).startsWith("-XX:MaxPermSize=")); - cmd.clear(); - - cmd.add("foo"); - addPermGenSizeOpt(cmd); - assertEquals(2, cmd.size()); - assertTrue(cmd.get(1).startsWith("-XX:MaxPermSize=")); - cmd.clear(); - - cmd.add("-XX:MaxPermSize=512m"); - addPermGenSizeOpt(cmd); - assertEquals(1, cmd.size()); - assertEquals("-XX:MaxPermSize=512m", cmd.get(0)); - cmd.clear(); - - cmd.add("'-XX:MaxPermSize=512m'"); - addPermGenSizeOpt(cmd); - assertEquals(1, cmd.size()); - assertEquals("'-XX:MaxPermSize=512m'", cmd.get(0)); - cmd.clear(); - } - } - private static void testOpt(String opts, List expected) { assertEquals(String.format("test string failed to parse: [[ %s ]]", opts), expected, parseOptionString(opts)); diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index ad2e7a70c4ea..d569b6688dec 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -233,7 +233,7 @@ private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) th launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath()); launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver"); - launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m"); + launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native"); } else { launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home") @@ -258,12 +258,6 @@ private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) th assertFalse("Memory arguments should not be set.", found); } - for (String arg : cmd) { - if (arg.startsWith("-XX:MaxPermSize=")) { - assertEquals("-XX:MaxPermSize=256m", arg); - } - } - String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator)); if (isDriver) { assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp)); diff --git a/launcher/src/test/resources/spark-defaults.conf b/launcher/src/test/resources/spark-defaults.conf index 239fc57883e9..3a51208c7c24 100644 --- a/launcher/src/test/resources/spark-defaults.conf +++ b/launcher/src/test/resources/spark-defaults.conf @@ -17,5 +17,5 @@ spark.driver.memory=1g spark.driver.extraClassPath=/driver -spark.driver.extraJavaOptions=-Ddriver -XX:MaxPermSize=256m +spark.driver.extraJavaOptions=-Ddriver spark.driver.extraLibraryPath=/native \ No newline at end of file diff --git a/pom.xml b/pom.xml index ac61a57a613c..60e4c7269eaf 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ UTF-8 UTF-8 - 1.7 + 1.8 3.3.9 spark 1.7.16 @@ -186,9 +186,6 @@ ${java.home} - - true - org.spark_project @@ -219,8 +216,6 @@ --> ${session.executionRootDirectory} - 64m - 512m 512m @@ -1920,7 +1915,7 @@ org.codehaus.mojo build-helper-maven-plugin - 1.12 + 3.0.0 net.alchim31.maven @@ -1967,8 +1962,6 @@ -Xms1024m -Xmx1024m - -XX:PermSize=${PermGen} - -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize} @@ -1983,7 +1976,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.6.0 + 3.6.1 ${java.version} ${java.version} @@ -2014,7 +2007,7 @@ **/*Suite.java ${project.build.directory}/surefire-reports - -Xmx3g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m + -Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize} - - - scala-compile-first - - - -javabootclasspath - ${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar - - - - - scala-test-compile-first - - - -javabootclasspath - ${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar - - - - - - - - - - scala-2.11 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index bcc00fa3e953..b48879faa4fb 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -56,9 +56,9 @@ object BuildCommons { "tags", "sketch" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects - val optionallyEnabledProjects@Seq(mesos, yarn, java8Tests, sparkGangliaLgpl, + val optionallyEnabledProjects@Seq(mesos, yarn, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests) = - Seq("mesos", "yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", + Seq("mesos", "yarn", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = @@ -233,8 +233,8 @@ object SparkBuild extends PomBuild { if (major >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty }, - javacJVMVersion := "1.7", - scalacJVMVersion := "1.7", + javacJVMVersion := "1.8", + scalacJVMVersion := "1.8", javacOptions in Compile ++= Seq( "-encoding", "UTF-8", @@ -245,24 +245,12 @@ object SparkBuild extends PomBuild { // additional discussion and explanation. javacOptions in (Compile, compile) ++= Seq( "-target", javacJVMVersion.value - ) ++ sys.env.get("JAVA_7_HOME").toSeq.flatMap { jdk7 => - if (javacJVMVersion.value == "1.7") { - Seq("-bootclasspath", s"$jdk7/jre/lib/rt.jar${File.pathSeparator}$jdk7/jre/lib/jce.jar") - } else { - Nil - } - }, + ), scalacOptions in Compile ++= Seq( s"-target:jvm-${scalacJVMVersion.value}", "-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath // Required for relative source links in scaladoc - ) ++ sys.env.get("JAVA_7_HOME").toSeq.flatMap { jdk7 => - if (javacJVMVersion.value == "1.7") { - Seq("-javabootclasspath", s"$jdk7/jre/lib/rt.jar${File.pathSeparator}$jdk7/jre/lib/jce.jar") - } else { - Nil - } - }, + ), // Implements -Xfatal-warnings, ignoring deprecation warnings. // Code snippet taken from https://issues.scala-lang.org/browse/SI-8410. @@ -363,8 +351,6 @@ object SparkBuild extends PomBuild { enable(Flume.settings)(streamingFlumeSink) - enable(Java8TestSettings.settings)(java8Tests) - // SPARK-14738 - Remove docker tests from main Spark build // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) @@ -387,7 +373,7 @@ object SparkBuild extends PomBuild { fork := true, outputStrategy in run := Some (StdoutOutput), - javaOptions ++= Seq("-Xmx2G", "-XX:MaxPermSize=256m"), + javaOptions += "-Xmx2g", sparkShell := { (runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value @@ -531,7 +517,6 @@ object SQL { object Hive { lazy val settings = Seq( - javaOptions += "-XX:MaxPermSize=256m", // Specially disable assertions since some Hive tests fail them javaOptions in Test := (javaOptions in Test).value.filterNot(_ == "-ea"), // Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings @@ -765,16 +750,6 @@ object CopyDependencies { } -object Java8TestSettings { - import BuildCommons._ - - lazy val settings = Seq( - javacJVMVersion := "1.8", - // Targeting Java 8 bytecode is only supported in Scala 2.11.4 and higher: - scalacJVMVersion := (if (System.getProperty("scala-2.10") == "true") "1.7" else "1.8") - ) -} - object TestSettings { import BuildCommons._ @@ -812,7 +787,7 @@ object TestSettings { javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test += "-ea", - javaOptions in Test ++= "-Xmx3g -Xss4096k -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g" + javaOptions in Test ++= "-Xmx3g -Xss4096k" .split(" ").toSeq, javaOptions += "-Xmx3g", // Exclude tags defined in a system property diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 70826ed326ba..b63c726c8560 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -904,7 +904,6 @@ private[spark] class Client( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts) val userClass = if (isClusterMode) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index b55b4b147bb7..ee85c043b8bc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.Utils @@ -190,7 +189,6 @@ private[yarn] class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts) val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => val absPath = diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala index 6c3556a2ee43..0c3d080cca25 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala @@ -38,16 +38,4 @@ private[spark] object YarnCommandBuilderUtils { CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true) } - /** - * Adds the perm gen configuration to the list of java options if needed and not yet added. - * - * Note that this method adds the option based on the local JVM version; if the node where - * the container is running has a different Java version, there's a risk that the option will - * not be added (e.g. if the AM is running Java 8 but the container's node is set up to use - * Java 7). - */ - def addPermGenSizeOpt(args: ListBuffer[String]): Unit = { - CommandBuilderUtils.addPermGenSizeOpt(args.asJava) - } - } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 465fb83669a7..089c84d5f773 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -134,12 +134,8 @@ final class Decimal extends Ordered[Decimal] with Serializable { * Set this Decimal to the given BigInteger value. Will have precision 38 and scale 0. */ def set(bigintval: BigInteger): Decimal = { - // TODO: Remove this once we migrate to java8 and use longValueExact() instead. - require( - bigintval.compareTo(LONG_MAX_BIG_INT) <= 0 && bigintval.compareTo(LONG_MIN_BIG_INT) >= 0, - s"BigInteger $bigintval too large for decimal") this.decimalVal = null - this.longVal = bigintval.longValue() + this.longVal = bigintval.longValueExact() this._precision = DecimalType.MAX_PRECISION this._scale = 0 this @@ -178,7 +174,7 @@ final class Decimal extends Ordered[Decimal] with Serializable { def toUnscaledLong: Long = { if (decimalVal.ne(null)) { - decimalVal.underlying().unscaledValue().longValue() + decimalVal.underlying().unscaledValue().longValueExact() } else { longVal } diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java index 2570c8d02ab7..d44af7ef4815 100644 --- a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java +++ b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java @@ -22,13 +22,13 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.KeyedState; /** * ::Experimental:: * Base interface for a map function used in - * {@link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState(FlatMapGroupsWithStateFunction, Encoder, Encoder)}. + * {@link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState( + * FlatMapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}. * @since 2.1.1 */ @Experimental diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java index 614d3925e051..75986d170620 100644 --- a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java +++ b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java @@ -22,13 +22,13 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.KeyedState; /** * ::Experimental:: * Base interface for a map function used in - * {@link org.apache.spark.sql.KeyValueGroupedDataset#mapGroupsWithState(MapGroupsWithStateFunction, Encoder, Encoder)} + * {@link org.apache.spark.sql.KeyValueGroupedDataset#mapGroupsWithState( + * MapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)} * @since 2.1.1 */ @Experimental diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index ce6e8be8b0ab..d30f6f325332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -103,7 +103,7 @@ private[sql] object Dataset { * the following creates a new Dataset by applying a filter on the existing one: * {{{ * val names = people.map(_.name) // in Scala; names is a Dataset[String] - * Dataset names = people.map((Person p) -> p.name, Encoders.STRING)); // in Java 8 + * Dataset names = people.map((Person p) -> p.name, Encoders.STRING)); * }}} * * Dataset operations can also be untyped, through various domain-specific-language (DSL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 94e689a4d5b9..3a548c251f5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -98,7 +98,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( * // Create Integer values grouped by String key from a Dataset> * Dataset> ds = ...; * KeyValueGroupedDataset grouped = - * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, Encoders.INT()); // Java 8 + * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, Encoders.INT()); * }}} * * @since 2.1.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ea465e2c834d..dbe55090ea11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -190,17 +190,6 @@ class SQLContext private[sql](val sparkSession: SparkSession) * The following example registers a UDF in Java: * {{{ * sqlContext.udf().register("myUDF", - * new UDF2() { - * @Override - * public String call(Integer arg1, String arg2) { - * return arg2 + arg1; - * } - * }, DataTypes.StringType); - * }}} - * - * Or, to use Java 8 lambda syntax: - * {{{ - * sqlContext.udf().register("myUDF", * (Integer arg1, String arg2) -> arg2 + arg1, * DataTypes.StringType); * }}} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index e1fdb2f2876b..1975a56cafe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -164,17 +164,6 @@ class SparkSession private( * The following example registers a UDF in Java: * {{{ * sparkSession.udf().register("myUDF", - * new UDF2() { - * @Override - * public String call(Integer arg1, String arg2) { - * return arg2 + arg1; - * } - * }, DataTypes.StringType); - * }}} - * - * Or, to use Java 8 lambda syntax: - * {{{ - * sparkSession.udf().register("myUDF", * (Integer arg1, String arg2) -> arg2 + arg1, * DataTypes.StringType); * }}} diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java similarity index 96% rename from external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java rename to sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java index 10d25fa4458a..8b8a403e2b19 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package test.org.apache.spark.java8.sql; +package test.org.apache.spark.sql; import java.util.Arrays; @@ -26,7 +26,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.KeyValueGroupedDataset; import org.apache.spark.sql.expressions.javalang.typed; -import test.org.apache.spark.sql.JavaDatasetAggregatorSuiteBase; /** * Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax. diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 5ef4e887ded0..a94a37cb21b3 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -228,7 +228,7 @@ public Iterator call(Integer key, Iterator values) { Dataset mapped2 = grouped.mapGroupsWithState( new MapGroupsWithStateFunction() { @Override - public String call(Integer key, Iterator values, KeyedState s) throws Exception { + public String call(Integer key, Iterator values, KeyedState s) { StringBuilder sb = new StringBuilder(key.toString()); while (values.hasNext()) { sb.append(values.next()); diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 9aedaf234ec1..0f249d7d5935 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -190,6 +190,7 @@ org.codehaus.mojo build-helper-maven-plugin + 3.0.0 add-scala-test-sources @@ -219,7 +220,7 @@ scalatest-maven-plugin - -da -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m + -da -Xmx3g -XX:ReservedCodeCacheSize=${CodeCacheSize} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala index e7c165c5f86c..d786a610f153 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala @@ -137,21 +137,13 @@ case class ScriptTransformationExec( throw writerThread.exception.get } - // Checks if the proc is still alive (incase the command ran was bad) - // The ideal way to do this is to use Java 8's Process#isAlive() - // but it cannot be used because Spark still supports Java 7. - // Following is a workaround used to check if a process is alive in Java 7 - // TODO: Once builds are switched to Java 8, this can be changed - try { + if (!proc.isAlive) { val exitCode = proc.exitValue() if (exitCode != 0) { logError(stderrBuffer.toString) // log the stderr circular buffer throw new SparkException(s"Subprocess exited with status $exitCode. " + s"Error: ${stderrBuffer.toString}", cause) } - } catch { - case _: IllegalThreadStateException => - // This means that the process is still alive. Move ahead } } diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java similarity index 98% rename from external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java rename to streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java index 338ca54ab829..646cb97066f3 100644 --- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -15,11 +15,17 @@ * limitations under the License. */ -package test.org.apache.spark.java8.dstream; +package test.org.apache.spark.streaming; import java.io.Serializable; import java.util.*; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.JavaTestUtils; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.StateSpec; +import org.apache.spark.streaming.Time; import scala.Tuple2; import com.google.common.collect.Lists; @@ -32,7 +38,6 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; @@ -139,7 +144,7 @@ public void testReduceByWindow() { JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); + (x, y) -> x - y, new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List> result = JavaTestUtils.runStreams(ssc, 4, 4); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java similarity index 99% rename from streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java rename to streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index 648a5abe0b89..8d24104d7870 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -15,13 +15,21 @@ * limitations under the License. */ -package org.apache.spark.streaming; +package test.org.apache.spark.streaming; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.JavaCheckpointTestUtils; +import org.apache.spark.streaming.JavaTestUtils; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.Seconds; +import org.apache.spark.streaming.StreamingContextState; +import org.apache.spark.streaming.StreamingContextSuite; +import org.apache.spark.streaming.Time; import scala.Tuple2; import org.apache.hadoop.conf.Configuration;