Skip to content

Commit 41a40c9

Browse files
committed
Merge remote-tracking branch 'origin/master' into newCodeGen
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
2 parents de22aac + a7d145e commit 41a40c9

File tree

82 files changed

+2244
-2304
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+2244
-2304
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ unit-tests.log
5151
rat-results.txt
5252
scalastyle.txt
5353
conf/*.conf
54+
scalastyle-output.xml
5455

5556
# For Hive
5657
metastore_db/
5758
metastore/
5859
warehouse/
5960
TempStatsStore/
60-
sql/hive-thriftserver/test_warehouses

assembly/pom.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,6 @@
165165
</dependency>
166166
</dependencies>
167167
</profile>
168-
<profile>
169-
<id>hive-thriftserver</id>
170-
<dependencies>
171-
<dependency>
172-
<groupId>org.apache.spark</groupId>
173-
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
174-
<version>${project.version}</version>
175-
</dependency>
176-
</dependencies>
177-
</profile>
178168
<profile>
179169
<id>spark-ganglia-lgpl</id>
180170
<dependencies>

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<groupId>org.apache.spark</groupId>
2929
<artifactId>spark-bagel_2.10</artifactId>
3030
<properties>
31-
<sbt.project.name>bagel</sbt.project.name>
31+
<sbt.project.name>bagel</sbt.project.name>
3232
</properties>
3333
<packaging>jar</packaging>
3434
<name>Spark Project Bagel</name>

bin/beeline

Lines changed: 0 additions & 45 deletions
This file was deleted.

bin/compute-classpath.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
5252
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
5353
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
5454
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
55-
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
5655
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
5756
fi
5857

bin/spark-shell

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ function main(){
4646
# (see https://github.com/sbt/sbt/issues/562).
4747
stty -icanon min 1 -echo > /dev/null 2>&1
4848
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
49-
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
49+
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
5050
stty icanon echo > /dev/null 2>&1
5151
else
5252
export SPARK_SUBMIT_OPTS
53-
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
53+
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
5454
fi
5555
}
5656

bin/spark-shell.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ rem
1919

2020
set SPARK_HOME=%~dp0..
2121

22-
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
22+
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main

bin/spark-sql

Lines changed: 0 additions & 36 deletions
This file was deleted.

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<groupId>org.apache.spark</groupId>
2929
<artifactId>spark-core_2.10</artifactId>
3030
<properties>
31-
<sbt.project.name>core</sbt.project.name>
31+
<sbt.project.name>core</sbt.project.name>
3232
</properties>
3333
<packaging>jar</packaging>
3434
<name>Spark Project Core</name>

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import scala.collection.mutable.{ArrayBuffer, HashSet}
20+
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122

22-
import org.apache.spark.executor.InputMetrics
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.storage._
2525

@@ -30,7 +30,7 @@ import org.apache.spark.storage._
3030
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3131

3232
/** Keys of RDD partitions that are being computed/loaded. */
33-
private val loading = new HashSet[RDDBlockId]()
33+
private val loading = new mutable.HashSet[RDDBlockId]
3434

3535
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
3636
def getOrCompute[T](
@@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
118118
}
119119

120120
/**
121-
* Cache the values of a partition, keeping track of any updates in the storage statuses
122-
* of other blocks along the way.
121+
* Cache the values of a partition, keeping track of any updates in the storage statuses of
122+
* other blocks along the way.
123+
*
124+
* The effective storage level refers to the level that actually specifies BlockManager put
125+
* behavior, not the level originally specified by the user. This is mainly for forcing a
126+
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
127+
* while preserving the the original semantics of the RDD as specified by the application.
123128
*/
124129
private def putInBlockManager[T](
125130
key: BlockId,
126131
values: Iterator[T],
127-
storageLevel: StorageLevel,
128-
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
129-
130-
if (!storageLevel.useMemory) {
131-
/* This RDD is not to be cached in memory, so we can just pass the computed values
132-
* as an iterator directly to the BlockManager, rather than first fully unrolling
133-
* it in memory. The latter option potentially uses much more memory and risks OOM
134-
* exceptions that can be avoided. */
135-
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
132+
level: StorageLevel,
133+
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
134+
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
135+
136+
val putLevel = effectiveStorageLevel.getOrElse(level)
137+
if (!putLevel.useMemory) {
138+
/*
139+
* This RDD is not to be cached in memory, so we can just pass the computed values as an
140+
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
141+
*/
142+
updatedBlocks ++=
143+
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
136144
blockManager.get(key) match {
137145
case Some(v) => v.data.asInstanceOf[Iterator[T]]
138146
case None =>
139147
logInfo(s"Failure to store $key")
140148
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
141149
}
142150
} else {
143-
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
151+
/*
152+
* This RDD is to be cached in memory. In this case we cannot pass the computed values
144153
* to the BlockManager as an iterator and expect to read it back later. This is because
145-
* we may end up dropping a partition from memory store before getting it back, e.g.
146-
* when the entirety of the RDD does not fit in memory. */
147-
val elements = new ArrayBuffer[Any]
148-
elements ++= values
149-
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
150-
elements.iterator.asInstanceOf[Iterator[T]]
154+
* we may end up dropping a partition from memory store before getting it back.
155+
*
156+
* In addition, we must be careful to not unroll the entire partition in memory at once.
157+
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
158+
* single partition. Instead, we unroll the values cautiously, potentially aborting and
159+
* dropping the partition to disk if applicable.
160+
*/
161+
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
162+
case Left(arr) =>
163+
// We have successfully unrolled the entire partition, so cache it in memory
164+
updatedBlocks ++=
165+
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
166+
arr.iterator.asInstanceOf[Iterator[T]]
167+
case Right(it) =>
168+
// There is not enough space to cache this partition in memory
169+
logWarning(s"Not enough space to cache partition $key in memory! " +
170+
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
171+
val returnValues = it.asInstanceOf[Iterator[T]]
172+
if (putLevel.useDisk) {
173+
logWarning(s"Persisting partition $key to disk instead.")
174+
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
175+
useOffHeap = false, deserialized = false, putLevel.replication)
176+
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
177+
} else {
178+
returnValues
179+
}
180+
}
151181
}
152182
}
153183

0 commit comments

Comments
 (0)