Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi

WORKS_WITH_IPYTHON=$($DEFAULT_PYTHON -c 'import sys; print(sys.version_info >= (2, 7, 0))')

# Determine the Python executable to use for the executors:
if [[ -z "$PYSPARK_PYTHON" ]]; then
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && ! WORKS_WITH_IPYTHON ]]; then
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
exit 1
else
Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ install_app() {

# Install maven under the build/ folder
install_mvn() {
local MVN_VERSION="3.3.3"
local MVN_VERSION="3.3.9"

install_app \
"https://www.apache.org/dyn/closer.lua?action=download&filename=/maven/maven-3/${MVN_VERSION}/binaries" \
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ class NewHadoopRDD[K, V](
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
Expand Down Expand Up @@ -190,6 +196,7 @@ class NewHadoopRDD[K, V](

private def close() {
if (reader != null) {
SqlNewHadoopRDDState.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ private[spark] object TaskLocation {

/**
* Create a TaskLocation from a string returned by getPreferredLocations.
* These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the
* location is cached.
* These strings have the form executor_[hostname]_[executorid], [hostname], or
* hdfs_cache_[hostname], depending on whether the location is cached.
*/
def apply(str: String): TaskLocation = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {
if (str.startsWith(executorLocationTag)) {
val splits = str.split("_")
if (splits.length != 3) {
throw new IllegalArgumentException("Illegal executor location format: " + str)
}
new ExecutorCacheTaskLocation(splits(1), splits(2))
val hostAndExecutorId = str.stripPrefix(executorLocationTag)
val splits = hostAndExecutorId.split("_", 2)
require(splits.length == 2, "Illegal executor location format: " + str)
val Array(host, executorId) = splits
new ExecutorCacheTaskLocation(host, executorId)
} else {
new HostTaskLocation(str)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
blockId, valuesOrBytes, Utils.bytesToString(size),
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
Expand Down
76 changes: 46 additions & 30 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1732,50 +1732,66 @@ private[spark] object Utils extends Logging {
}

/**
* Terminates a process waiting for at most the specified duration. Returns whether
* the process terminated.
* Terminates a process waiting for at most the specified duration.
*
* @return the process exit value if it was successfully terminated, else None
*/
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
try {
// Java8 added a new API which will more forcibly kill the process. Use that if available.
val destroyMethod = process.getClass().getMethod("destroyForcibly");
destroyMethod.setAccessible(true)
destroyMethod.invoke(process)
} catch {
case NonFatal(e) =>
if (!e.isInstanceOf[NoSuchMethodException]) {
logWarning("Exception when attempting to kill process", e)
}
process.destroy()
}
// Politely destroy first
process.destroy()

if (waitForProcess(process, timeoutMs)) {
// Successful exit
Option(process.exitValue())
} else {
None
// Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
classOf[Process].getMethod("destroyForcibly").invoke(process)
} catch {
case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
if (waitForProcess(process, timeoutMs)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
None
}
}
}

/**
* Wait for a process to terminate for at most the specified duration.
* Return whether the process actually terminated after the given timeout.
*
* @return whether the process actually terminated before the given timeout.
*/
def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
var terminated = false
val startTime = System.currentTimeMillis
while (!terminated) {
try {
process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>
// Process not terminated yet
if (System.currentTimeMillis - startTime > timeoutMs) {
return false
try {
// Use Java 8 method if available
classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
.invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
.asInstanceOf[Boolean]
} catch {
case _: NoSuchMethodException =>
// Otherwise implement it manually
var terminated = false
val startTime = System.currentTimeMillis
while (!terminated) {
try {
process.exitValue()
terminated = true
} catch {
case e: IllegalThreadStateException =>
// Process not terminated yet
if (System.currentTimeMillis - startTime > timeoutMs) {
return false
}
Thread.sleep(100)
}
Thread.sleep(100)
}
}
true
}
true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(TaskLocation("host1") === HostTaskLocation("host1"))
assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
assert(TaskLocation("executor_some.host1_executor_task_3") ===
ExecutorCacheTaskLocation("some.host1", "executor_task_3"))
}

def createTaskResult(id: Int): DirectTaskResult[Int] = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(terminated.isDefined)
Utils.waitForProcess(process, 5000)
val duration = System.currentTimeMillis() - start
assert(duration < 5000)
assert(duration < 6000) // add a little extra time to allow a force kill to finish
assert(!pidExists(pid))
} finally {
signal(pid, "SIGKILL")
Expand Down
8 changes: 6 additions & 2 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
raw_input = input
xrange = range

SPARK_EC2_VERSION = "1.6.1"
SPARK_EC2_VERSION = "1.6.3"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))

VALID_SPARK_VERSIONS = set([
Expand All @@ -77,6 +77,8 @@
"1.5.2",
"1.6.0",
"1.6.1",
"1.6.2",
"1.6.3",
])

SPARK_TACHYON_MAP = {
Expand All @@ -96,14 +98,16 @@
"1.5.2": "0.7.1",
"1.6.0": "0.8.2",
"1.6.1": "0.8.2",
"1.6.2": "0.8.2",
"1.6.3": "0.8.2",
}

DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
DEFAULT_SPARK_GITHUB_REPO = "https://github.com/apache/spark"

# Default location to get the spark-ec2 scripts (and ami-list) from
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.5"
DEFAULT_SPARK_EC2_BRANCH = "branch-1.6"


def setup_external_libs(libs):
Expand Down
3 changes: 1 addition & 2 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ object Unidoc {
ignoreUndocumentedPackages((unidocAllSources in (JavaUnidoc, unidoc)).value)
},

// Javadoc options: create a window title, and group key packages on index page
javacOptions in doc := Seq(
javacOptions in (JavaUnidoc, unidoc) := Seq(
"-windowtitle", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " JavaDoc",
"-public",
"-group", "Core Java API", packageList("api.java", "api.java.function"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,20 @@ abstract class Star extends LeafExpression with NamedExpression {
case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevaluable {

override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = {
// If there is no table specified, use all input attributes.
if (target.isEmpty) return input.output

// First try to expand assuming it is table.*.
val expandedAttributes: Seq[Attribute] = target match {
// If there is no table specified, use all input attributes.
case None => input.output
// If there is a table, pick out attributes that are part of this table.
case Some(t) => if (t.size == 1) {
input.output.filter(_.qualifiers.exists(resolver(_, t.head)))
val expandedAttributes =
if (target.get.size == 1) {
// If there is a table, pick out attributes that are part of this table.
input.output.filter(_.qualifiers.exists(resolver(_, target.get.head)))
} else {
List()
}
}
if (expandedAttributes.nonEmpty) return expandedAttributes

// Try to resolve it as a struct expansion. If there is a conflict and both are possible,
// (i.e. [name].* is both a table and a struct), the struct path can always be qualified.
require(target.isDefined)
val attribute = input.resolve(target.get, resolver)
if (attribute.isDefined) {
// This target resolved to an attribute in child. It must be a struct. Expand it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.scalatest.Matchers._

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.execution.Project
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -591,15 +593,44 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
)
}

test("InputFileName") {
test("InputFileName - SqlNewHadoopRDD") {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10).toDF("id")
data.write.parquet(dir.getCanonicalPath)
val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName())
val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(input_file_name())
.head.getString(0)
assert(answer.contains(dir.getCanonicalPath))

checkAnswer(data.select(inputFileName()).limit(1), Row(""))
checkAnswer(data.select(input_file_name()).limit(1), Row(""))
}
}

test("input_file_name - HadoopRDD") {
withTempPath { dir =>
val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF()
data.write.text(dir.getCanonicalPath)
val df = sparkContext.textFile(dir.getCanonicalPath).toDF()
val answer = df.select(input_file_name()).head.getString(0)
assert(answer.contains(dir.getCanonicalPath))

checkAnswer(data.select(input_file_name()).limit(1), Row(""))
}
}

test("input_file_name - NewHadoopRDD") {
withTempPath { dir =>
val data = sparkContext.parallelize((0 to 10).map(_.toString)).toDF()
data.write.text(dir.getCanonicalPath)
val rdd = sparkContext.newAPIHadoopFile(
dir.getCanonicalPath,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text])
val df = rdd.map(pair => pair._2.toString).toDF()
val answer = df.select(input_file_name()).head.getString(0)
assert(answer.contains(dir.getCanonicalPath))

checkAnswer(data.select(input_file_name()).limit(1), Row(""))
}
}

Expand Down
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1950,6 +1950,37 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("Star Expansion - table with zero column") {
withTempTable("temp_table_no_cols") {
val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
val dfNoCols = sqlContext.createDataFrame(rddNoCols, StructType(Seq.empty))
dfNoCols.registerTempTable("temp_table_no_cols")

// ResolvedStar
checkAnswer(
dfNoCols,
dfNoCols.select(dfNoCols.col("*")))

// UnresolvedStar
checkAnswer(
dfNoCols,
sql("SELECT * FROM temp_table_no_cols"))
checkAnswer(
dfNoCols,
dfNoCols.select($"*"))

var e = intercept[AnalysisException] {
sql("SELECT a.* FROM temp_table_no_cols a")
}.getMessage
assert(e.contains("cannot resolve 'a.*' give input columns ''"))

e = intercept[AnalysisException] {
dfNoCols.select($"b.*")
}.getMessage
assert(e.contains("cannot resolve 'b.*' give input columns ''"))
}
}

test("Common subexpression elimination") {
// select from a table to prevent constant folding.
val df = sql("SELECT a, b from testData2 limit 1")
Expand Down