Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.LinkedList;

import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +93,17 @@ public UnsafeExternalSorter(
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
freeMemory();
return null;
}
});
}

// TODO: metrics tracking + integration with shuffle write metrics
Expand Down
6 changes: 5 additions & 1 deletion python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ def launch_gateway():
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = "--conf spark.ui.enabled=false " + submit_args
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
"--conf spark.buffer.pageSize=4mb",
submit_args
])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)

# Start a socket that will be used by PythonGatewayServer to communicate its port to us
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private[spark] object SQLConf {
" a specific query.")

val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
defaultValue = Some(false),
defaultValue = Some(true),
doc = "When true, use the new optimized Tungsten physical execution backend.")

val DIALECT = stringConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una

override def canProcessSafeRows: Boolean = true

override def canProcessUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = {
// Do not use the Unsafe path if we are using a RangePartitioning, since this may lead to
// an interpreted RowOrdering being applied to an UnsafeRow, which will lead to
// ClassCastExceptions at runtime. This check can be removed after SPARK-9054 is fixed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be unnecessary now that we support generic getters in UnsafeRows.

!newPartitioning.isInstanceOf[RangePartitioning]
}

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import org.scalatest.Matchers._

import org.apache.spark.sql.execution.Project
import org.apache.spark.sql.execution.{Project, TungstenProject}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -538,6 +538,7 @@ class ColumnExpressionSuite extends QueryTest with SQLTestUtils {
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
val projects = df.queryExecution.executedPlan.collect {
case project: Project => project
case tungstenProject: TungstenProject => tungstenProject
}
assert(projects.size === expectedNumProjects)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.conf.setConf(SQLConf.CODEGEN_ENABLED, SQLConf.CODEGEN_ENABLED.defaultValue.get)
}

ignore("sort followed by limit should not leak memory") {
// TODO: this test is going to fail until we implement a proper iterator interface
// with a close() method.
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
test("sort followed by limit") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
Expand All @@ -48,21 +45,6 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
)
}

test("sort followed by limit") {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
try {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
(child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
} finally {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")

}
}

test("sorting does not crash for large inputs") {
val sortOrder = 'a.asc :: Nil
val stringLength = 1024 * 1024 * 2
Expand Down