Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 64 additions & 10 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import sun.misc.Cleaner;
import sun.misc.Unsafe;

public final class Platform {
Expand Down Expand Up @@ -67,6 +67,60 @@ public final class Platform {
unaligned = _unaligned;
}

// Access fields and constructors once and store them, for performance:

private static final Constructor<?> DBB_CONSTRUCTOR;
private static final Field DBB_CLEANER_FIELD;
static {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
DBB_CONSTRUCTOR = constructor;
DBB_CLEANER_FIELD = cleanerField;
} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) {
throw new IllegalStateException(e);
}
}

private static final Method CLEANER_CREATE_METHOD;
static {
// The implementation of Cleaner changed from JDK 8 to 9
Copy link
Contributor

@skonto skonto Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to reference java 9, its dead right and it will have no LTS AFAIK.
Shouldnt we only support LTS?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this comment is just noting that the change happened between Java 8 and 9. We are targeting support for Java 11. However I expect virtually all of the changes that we have to make are due to changes in Java 9. (And I have no reason to believe Java 9-10 wouldn't work; we want them to work too)

// Split java.version on non-digit chars:
int majorVersion = Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]);
String cleanerClassName;
if (majorVersion < 9) {
cleanerClassName = "sun.misc.Cleaner";
} else {
cleanerClassName = "jdk.internal.ref.Cleaner";
}
try {
Class<?> cleanerClass = Class.forName(cleanerClassName);
Method createMethod = cleanerClass.getMethod("create", Object.class, Runnable.class);
// Accessing jdk.internal.ref.Cleaner should actually fail by default in JDK 9+,
// unfortunately, unless the user has allowed access with something like
// --add-opens java.base/java.lang=ALL-UNNAMED If not, we can't really use the Cleaner
// hack below. It doesn't break, just means the user might run into the default JVM limit
// on off-heap memory and increase it or set the flag above. This tests whether it's
// available:
try {
createMethod.invoke(null, null, null);
} catch (IllegalAccessException e) {
// Don't throw an exception, but can't log here?
createMethod = null;
} catch (InvocationTargetException ite) {
// shouldn't happen; report it
throw new IllegalStateException(ite);
}
CLEANER_CREATE_METHOD = createMethod;
} catch (ClassNotFoundException | NoSuchMethodException e) {
throw new IllegalStateException(e);
}

}

/**
* @return true when running JVM is having sun's Unsafe package available in it and underlying
* system having unaligned-access capability.
Expand Down Expand Up @@ -159,18 +213,18 @@ public static long reallocateMemory(long address, long oldSize, long newSize) {
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
* to increase it).
*/
@SuppressWarnings("unchecked")
public static ByteBuffer allocateDirectBuffer(int size) {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
if (CLEANER_CREATE_METHOD != null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #23424 ; I now think this was an error.

try {
DBB_CLEANER_FIELD.set(buffer,
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
return buffer;
} catch (Exception e) {
throwException(e);
Expand Down
36 changes: 29 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.Map
import scala.collection.mutable

import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
* Storage information for each BlockManager.
Expand Down Expand Up @@ -193,6 +196,31 @@ private[spark] class StorageStatus(

/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {

// In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible
// to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to
// jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with
// reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is
// still accessible with reflection.
private val bufferCleaner: DirectBuffer => Unit =
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
val cleanerMethod =
Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer])
val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
unsafeField.setAccessible(true)
val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer)
} else {
val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean")
buffer: DirectBuffer => {
// Careful to avoid the return type of .cleaner(), which changes with JDK
val cleaner: AnyRef = buffer.cleaner()
if (cleaner != null) {
cleanerMethod.invoke(cleaner)
}
}
}

/**
* Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
* API that will cause errors if one attempts to read from the disposed buffer. However, neither
Expand All @@ -204,14 +232,8 @@ private[spark] object StorageUtils extends Logging {
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Disposing of $buffer")
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
bufferCleaner(buffer.asInstanceOf[DirectBuffer])
}
}

private def cleanDirectBuffer(buffer: DirectBuffer) = {
val cleaner = buffer.cleaner()
if (cleaner != null) {
cleaner.clean()
}
}
}
7 changes: 2 additions & 5 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.util.Random

import com.google.common.io.Files
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -932,10 +932,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
signal(pid, "SIGKILL")
}

val versionParts = System.getProperty("java.version").split("[+.\\-]+", 3)
var majorVersion = versionParts(0).toInt
if (majorVersion == 1) majorVersion = versionParts(1).toInt
if (majorVersion >= 8) {
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_1_8)) {
// We'll make sure that forcibly terminating a process works by
// creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On
// older versions of java, this will *not* terminate.
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ commons-digester-1.8.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-lang3-3.8.1.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-3.1.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ commons-dbcp-1.4.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-lang3-3.5.jar
commons-lang3-3.8.1.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-3.1.jar
Expand Down Expand Up @@ -116,8 +116,8 @@ jersey-container-servlet-core-2.22.2.jar
jersey-guava-2.22.2.jar
jersey-media-jaxb-2.22.2.jar
jersey-server-2.22.2.jar
jetty-webapp-9.3.24.v20180605.jar
jetty-xml-9.3.24.v20180605.jar
jetty-webapp-9.4.12.v20180830.jar
jetty-xml-9.4.12.v20180830.jar
jline-2.14.6.jar
joda-time-2.9.3.jar
jodd-core-3.5.2.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ object LogQuery {
| GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
| 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
| 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
| 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.lines.mkString,
| 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.split('\n').mkString,
"""10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
| HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
| GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
| 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
| 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
| 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.lines.mkString
| 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.split('\n').mkString
)

def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,10 +862,10 @@ class MatricesSuite extends SparkMLFunSuite {
mat.toString(0, 0)
mat.toString(Int.MinValue, Int.MinValue)
mat.toString(Int.MaxValue, Int.MaxValue)
var lines = mat.toString(6, 50).lines.toArray
var lines = mat.toString(6, 50).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 50))

lines = mat.toString(5, 100).lines.toArray
lines = mat.toString(5, 100).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 100))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,10 @@ class MatricesSuite extends SparkFunSuite {
mat.toString(0, 0)
mat.toString(Int.MinValue, Int.MinValue)
mat.toString(Int.MaxValue, Int.MaxValue)
var lines = mat.toString(6, 50).lines.toArray
var lines = mat.toString(6, 50).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 50))

lines = mat.toString(5, 100).lines.toArray
lines = mat.toString(5, 100).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 100))
}

Expand Down
22 changes: 17 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<orc.version>1.5.3</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.3.24.v20180605</jetty.version>
<jetty.version>9.4.12.v20180830</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<chill.version>0.9.3</chill.version>
<ivy.version>2.4.0</ivy.version>
Expand Down Expand Up @@ -166,7 +166,7 @@
<!-- org.apache.commons/commons-lang/-->
<commons-lang2.version>2.6</commons-lang2.version>
<!-- org.apache.commons/commons-lang3/-->
<commons-lang3.version>3.5</commons-lang3.version>
<commons-lang3.version>3.8.1</commons-lang3.version>
<datanucleus-core.version>3.2.10</datanucleus-core.version>
<janino.version>3.0.10</janino.version>
<jersey.version>2.22.2</jersey.version>
Expand Down Expand Up @@ -2016,7 +2016,7 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 3.3.1 won't work with zinc; fails to find javac from java.home -->
<version>3.2.2</version>
<version>3.4.4</version>
<executions>
<execution>
<id>eclipse-add-source</id>
Expand Down Expand Up @@ -2281,7 +2281,19 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>7.0</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
<version>7.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -2296,7 +2308,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.2</version>
<version>3.1.1</version>
<executions>
<execution>
<id>default-cli</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hive.service.cli.thrift;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void run() {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
String threadPoolName = "HiveServer2-HttpHandler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
ThreadPoolExecutor executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
Expand Down