Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
480a74a
Initial import of code from Databricks unsafe utils repo.
JoshRosen Apr 17, 2015
ab68e08
Begin merging the UTF8String implementations.
JoshRosen Apr 18, 2015
f03e9c1
Play around with Unsafe implementations of more string methods.
JoshRosen Apr 18, 2015
5d55cef
Add skeleton for Row implementation.
JoshRosen Apr 18, 2015
8a8f9df
Add skeleton for GeneratedAggregate integration.
JoshRosen Apr 18, 2015
1ff814d
Add reminder to free memory on iterator completion
JoshRosen Apr 18, 2015
53ba9b7
Start prototyping Java Row -> UnsafeRow converters
JoshRosen Apr 19, 2015
fc4c3a8
Sketch how the converters will be used in UnsafeGeneratedAggregate
JoshRosen Apr 19, 2015
1a483c5
First version that passes some aggregation tests:
JoshRosen Apr 19, 2015
079f1bf
Some clarification of the BytesToBytesMap.lookup() / set() contract.
JoshRosen Apr 19, 2015
f764d13
Simplify address + length calculation in Location.
JoshRosen Apr 19, 2015
c754ae1
Now that the store*() contract has been stregthened, we can remove an…
JoshRosen Apr 19, 2015
ae39694
Add finalizer as "cleanup method of last resort"
JoshRosen Apr 19, 2015
c7f0b56
Reuse UnsafeRow pointer in UnsafeRowConverter
JoshRosen Apr 20, 2015
62ab054
Optimize for fact that get() is only called on String columns.
JoshRosen Apr 20, 2015
c55bf66
Free buffer once iterator has been fully consumed.
JoshRosen Apr 20, 2015
738fa33
Add feature flag to guard UnsafeGeneratedAggregate
JoshRosen Apr 20, 2015
c1b3813
Fix bug in UnsafeMemoryAllocator.free():
JoshRosen Apr 20, 2015
7df6008
Optimizations related to zeroing out memory:
JoshRosen Apr 21, 2015
58ac393
Use UNSAFE allocator in GeneratedAggregate (TODO: make this configura…
JoshRosen Apr 21, 2015
d2bb986
Update to implement new Row methods added upstream
JoshRosen Apr 22, 2015
b3eaccd
Extract aggregation map into its own class.
JoshRosen Apr 22, 2015
bade966
Comment update (bumping to refresh GitHub cache...)
JoshRosen Apr 22, 2015
d85eeff
Add basic sanity test for UnsafeFixedWidthAggregationMap
JoshRosen Apr 22, 2015
1f4b716
Merge Unsafe code into the regular GeneratedAggregate, guarded by a
JoshRosen Apr 22, 2015
92d5a06
Address a number of minor code review comments.
JoshRosen Apr 23, 2015
628f936
Use ints intead of longs for indexing.
JoshRosen Apr 23, 2015
23a440a
Bump up default hash map size
JoshRosen Apr 23, 2015
765243d
Enable optional performance metrics for hash map.
JoshRosen Apr 23, 2015
b26f1d3
Fix bug in murmur hash implementation.
JoshRosen Apr 23, 2015
49aed30
More long -> int conversion.
JoshRosen Apr 23, 2015
29a7575
Remove debug logging
JoshRosen Apr 24, 2015
ef6b3d3
Fix a bunch of FindBugs and IntelliJ inspections
JoshRosen Apr 24, 2015
06e929d
More warning cleanup
JoshRosen Apr 24, 2015
854201a
Import and comment cleanup
JoshRosen Apr 24, 2015
f3dcbfe
More mod replacement
JoshRosen Apr 24, 2015
afe8dca
Some Javadoc cleanup
JoshRosen Apr 24, 2015
a95291e
Cleanups to string handling code
JoshRosen Apr 24, 2015
31eaabc
Lots of TODO and doc cleanup.
JoshRosen Apr 24, 2015
6ffdaa1
Null handling improvements in UnsafeRow.
JoshRosen Apr 24, 2015
9c19fc0
Add configuration options for heap vs. offheap
JoshRosen Apr 24, 2015
cde4132
Add missing pom.xml
JoshRosen Apr 26, 2015
0925847
Disable MiMa checks for new unsafe module
JoshRosen Apr 27, 2015
a8e4a3f
Introduce MemoryManager interface; add to SparkEnv.
JoshRosen Apr 28, 2015
b45f070
Don't redundantly store the offset from key to value, since we can co…
JoshRosen Apr 28, 2015
162caf7
Fix test compilation
JoshRosen Apr 28, 2015
3ca84b2
Only zero the used portion of groupingKeyConversionScratchSpace
JoshRosen Apr 28, 2015
529e571
Measure timeSpentResizing in nanoseconds instead of milliseconds.
JoshRosen Apr 28, 2015
ce3c565
More comments, formatting, and code cleanup.
JoshRosen Apr 28, 2015
78a5b84
Add logging to MemoryManager
JoshRosen Apr 28, 2015
a19e066
Rename unsafe Java test suites to match Scala test naming convention.
JoshRosen Apr 28, 2015
de5e001
Fix debug vs. trace in logging message.
JoshRosen Apr 28, 2015
6e4b192
Remove an unused method from ByteArrayMethods.
JoshRosen Apr 28, 2015
70a39e4
Split MemoryManager into ExecutorMemoryManager and TaskMemoryManager:
JoshRosen Apr 28, 2015
50e9671
Throw memory leak warning even in case of error; add warning about co…
JoshRosen Apr 29, 2015
017b2dc
Remove BytesToBytesMap.finalize()
JoshRosen Apr 29, 2015
1bc36cc
Refactor UnsafeRowConverter to avoid unnecessary boxing.
JoshRosen Apr 29, 2015
81f34f8
Follow 'place children last' convention for GeneratedAggregate
JoshRosen Apr 29, 2015
eeee512
Add converters for Null, Boolean, Byte, and Short columns.
JoshRosen Apr 29, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
import org.apache.spark.util.{RpcUtils, Utils}

/**
Expand Down Expand Up @@ -69,6 +70,7 @@ class SparkEnv (
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val executorMemoryManager: ExecutorMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

Expand Down Expand Up @@ -382,6 +384,15 @@ object SparkEnv extends Logging {
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

val executorMemoryManager: ExecutorMemoryManager = {
val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
MemoryAllocator.UNSAFE
} else {
MemoryAllocator.HEAP
}
new ExecutorMemoryManager(allocator)
}

val envInstance = new SparkEnv(
executorId,
rpcEnv,
Expand All @@ -398,6 +409,7 @@ object SparkEnv extends Logging {
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
executorMemoryManager,
outputCommitCoordinator,
conf)

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.Serializable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener


Expand Down Expand Up @@ -133,4 +134,9 @@ abstract class TaskContext extends Serializable {
/** ::DeveloperApi:: */
@DeveloperApi
def taskMetrics(): TaskMetrics

/**
* Returns the manager for this task's managed memory.
*/
private[spark] def taskMemoryManager(): TaskMemoryManager
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}

import scala.collection.mutable.ArrayBuffer
Expand All @@ -27,6 +28,7 @@ private[spark] class TaskContextImpl(
val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -179,6 +180,7 @@ private[spark] class Executor(
}

override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
Expand All @@ -191,6 +193,7 @@ private[spark] class Executor(
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.setTaskMemoryManager(taskMemoryManager)

// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
Expand All @@ -207,7 +210,21 @@ private[spark] class Executor(

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
val value = try {
task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
} finally {
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
// when changing this, make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

// If the task has been killed, let's fail it.
Expand Down
22 changes: 20 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

Expand Down Expand Up @@ -643,15 +644,32 @@ class DAGScheduler(
try {
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
attemptNumber = 0, runningLocally = true)
val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
val taskContext =
new TaskContextImpl(
job.finalStage.id,
job.partitions(0),
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
runningLocally = true)
TaskContext.setTaskContext(taskContext)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.markTaskCompleted()
TaskContext.unset()
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
// make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
} else {
logError(s"Managed memory leak detected; size = $freedMemory bytes")
}
}
}
} catch {
case e: Exception =>
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.{TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -52,8 +53,13 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
* @return the result of the task
*/
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
context = new TaskContextImpl(
stageId = stageId,
partitionId = partitionId,
taskAttemptId = taskAttemptId,
attemptNumber = attemptNumber,
taskMemoryManager = taskMemoryManager,
runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
Expand All @@ -68,6 +74,12 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
}
}

private var taskMemoryManager: TaskMemoryManager = _

def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = {
this.taskMemoryManager = taskMemoryManager
}

def runTask(context: TaskContext): T

def preferredLocations: Seq[TaskLocation] = Nil
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ public void persist() {
@Test
public void iterator() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
TaskContext context = new TaskContextImpl(0, 0, 0L, 0, false, new TaskMetrics());
TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics());
Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
// in blockManager.put is a losing battle. You have been warned.
blockManager = sc.env.blockManager
cacheManager = sc.env.cacheManager
val context = new TaskContextImpl(0, 0, 0, 0)
val context = new TaskContextImpl(0, 0, 0, 0, null)
val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
assert(computeValue.toList === List(1, 2, 3, 4))
Expand All @@ -77,7 +77,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))

val context = new TaskContextImpl(0, 0, 0, 0)
val context = new TaskContextImpl(0, 0, 0, 0, null)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
Expand All @@ -86,14 +86,14 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)

val context = new TaskContextImpl(0, 0, 0, 0, true)
val context = new TaskContextImpl(0, 0, 0, 0, null, true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}

test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = new TaskContextImpl(0, 0, 0, 0)
val context = new TaskContextImpl(0, 0, 0, 0, null)
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
}
val hadoopPart1 = generateFakeHadoopPartition()
val pipedRdd = new PipedRDD(nums, "printenv " + varName)
val tContext = new TaskContextImpl(0, 0, 0, 0)
val tContext = new TaskContextImpl(0, 0, 0, 0, null)
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray
assert(arr(0) == "/some/path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
}

test("all TaskCompletionListeners should be called even if some fail") {
val context = new TaskContextImpl(0, 0, 0, 0)
val context = new TaskContextImpl(0, 0, 0, 0, null)
val listener = mock(classOf[TaskCompletionListener])
context.addTaskCompletionListener(_ => throw new Exception("blah"))
context.addTaskCompletionListener(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
)

val iterator = new ShuffleBlockFetcherIterator(
new TaskContextImpl(0, 0, 0, 0),
new TaskContextImpl(0, 0, 0, 0, null),
transfer,
blockManager,
blocksByAddress,
Expand Down Expand Up @@ -154,7 +154,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))

val taskContext = new TaskContextImpl(0, 0, 0, 0)
val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
Expand Down Expand Up @@ -217,7 +217,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))

val taskContext = new TaskContextImpl(0, 0, 0, 0)
val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
val iterator = new ShuffleBlockFetcherIterator(
taskContext,
transfer,
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
<module>sql/catalyst</module>
<module>sql/core</module>
<module>sql/hive</module>
<module>unsafe</module>
<module>assembly</module>
<module>external/twitter</module>
<module>external/flume</module>
Expand Down Expand Up @@ -1205,6 +1206,7 @@
<spark.ui.enabled>false</spark.ui.enabled>
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
<spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts>
<spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak>
</systemProperties>
<failIfNoTests>false</failIfNoTests>
</configuration>
Expand Down
7 changes: 4 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ object BuildCommons {

val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
streamingMqtt, streamingTwitter, streamingZeromq, launcher) =
streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe) =
Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-zeromq", "launcher").map(ProjectRef(buildLocation, _))
"streaming-zeromq", "launcher", "unsafe").map(ProjectRef(buildLocation, _))

val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl,
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
Expand Down Expand Up @@ -159,7 +159,7 @@ object SparkBuild extends PomBuild {
// TODO: Add Sql to mima checks
// TODO: remove launcher from this list after 1.3.
allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
networkCommon, networkShuffle, networkYarn, launcher).contains(x)).foreach {
networkCommon, networkShuffle, networkYarn, launcher, unsafe).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}

Expand Down Expand Up @@ -496,6 +496,7 @@ object TestSettings {
javaOptions in Test += "-Dspark.ui.enabled=false",
javaOptions in Test += "-Dspark.ui.showConsoleProgress=false",
javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true",
javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
.map { case (k,v) => s"-D$k=$v" }.toSeq,
Expand Down
5 changes: 5 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Loading