Skip to content

Commit 2c997c5

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark
2 parents 00bc819 + d3a302d commit 2c997c5

File tree

49 files changed

+2317
-1332
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+2317
-1332
lines changed

R/pkg/R/DataFrame.R

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),
790790

791791
setMethod("$<-", signature(x = "DataFrame"),
792792
function(x, name, value) {
793-
stopifnot(class(value) == "Column")
793+
stopifnot(class(value) == "Column" || is.null(value))
794794
cols <- columns(x)
795795
if (name %in% cols) {
796+
if (is.null(value)) {
797+
cols <- Filter(function(c) { c != name }, cols)
798+
}
796799
cols <- lapply(cols, function(c) {
797800
if (c == name) {
798801
alias(value, name)
@@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
802805
})
803806
nx <- select(x, cols)
804807
} else {
808+
if (is.null(value)) {
809+
return(x)
810+
}
805811
nx <- withColumn(x, name, value)
806812
}
807813
x@sdf <- nx@sdf

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,11 @@ test_that("select operators", {
449449
df$age2 <- df$age * 2
450450
expect_equal(columns(df), c("name", "age", "age2"))
451451
expect_equal(count(where(df, df$age2 == df$age * 2)), 2)
452+
453+
df$age2 <- NULL
454+
expect_equal(columns(df), c("name", "age"))
455+
df$age3 <- NULL
456+
expect_equal(columns(df), c("name", "age"))
452457
})
453458

454459
test_that("select with column", {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java.function;
19+
20+
import java.io.Serializable;
21+
22+
/**
23+
* A zero-argument function that returns an R.
24+
*/
25+
public interface Function0<R> extends Serializable {
26+
public R call() throws Exception;
27+
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
7777
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
7878
Option(propertiesFile).foreach { filename =>
7979
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
80-
if (k.startsWith("spark.")) {
81-
defaultProperties(k) = v
82-
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
83-
} else {
84-
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
85-
}
80+
defaultProperties(k) = v
81+
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
8682
}
8783
}
8884
defaultProperties
@@ -97,6 +93,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
9793
}
9894
// Populate `sparkProperties` map from properties file
9995
mergeDefaultSparkProperties()
96+
// Remove keys that don't start with "spark." from `sparkProperties`.
97+
ignoreNonSparkProperties()
10098
// Use `sparkProperties` map along with env vars to fill in any missing parameters
10199
loadEnvironmentArguments()
102100

@@ -117,6 +115,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
117115
}
118116
}
119117

118+
/**
119+
* Remove keys that don't start with "spark." from `sparkProperties`.
120+
*/
121+
private def ignoreNonSparkProperties(): Unit = {
122+
sparkProperties.foreach { case (k, v) =>
123+
if (!k.startsWith("spark.")) {
124+
sparkProperties -= k
125+
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
126+
}
127+
}
128+
}
129+
120130
/**
121131
* Load arguments from environment variables, Spark properties etc.
122132
*/

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI
3535
import org.apache.spark.util.{ThreadUtils, Utils}
3636
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3737

38-
3938
/**
4039
* A class that provides application history from event logs stored in the file system.
4140
* This provider checks for new finished applications in the background periodically and
@@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
7675
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
7776
= new mutable.LinkedHashMap()
7877

78+
// List of applications to be deleted by event log cleaner.
79+
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
80+
7981
// Constants used to parse Spark 1.0.0 log directories.
8082
private[history] val LOG_PREFIX = "EVENT_LOG_"
8183
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
@@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
266268
*/
267269
private def cleanLogs(): Unit = {
268270
try {
269-
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
270-
.getOrElse(Seq[FileStatus]())
271271
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
272272

273273
val now = System.currentTimeMillis()
274274
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
275275

276+
// Scan all logs from the log directory.
277+
// Only completed applications older than the specified max age will be deleted.
276278
applications.values.foreach { info =>
277-
if (now - info.lastUpdated <= maxAge) {
279+
if (now - info.lastUpdated <= maxAge || !info.completed) {
278280
appsToRetain += (info.id -> info)
281+
} else {
282+
appsToClean += info
279283
}
280284
}
281285

282286
applications = appsToRetain
283287

284-
// Scan all logs from the log directory.
285-
// Only directories older than the specified max age will be deleted
286-
statusList.foreach { dir =>
288+
val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
289+
appsToClean.foreach { info =>
287290
try {
288-
if (now - dir.getModificationTime() > maxAge) {
289-
// if path is a directory and set to true,
290-
// the directory is deleted else throws an exception
291-
fs.delete(dir.getPath, true)
291+
val path = new Path(logDir, info.logPath)
292+
if (fs.exists(path)) {
293+
fs.delete(path, true)
292294
}
293295
} catch {
294-
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
296+
case e: AccessControlException =>
297+
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
298+
case t: IOException =>
299+
logError(s"IOException in cleaning logs of ${info.logPath}", t)
300+
leftToClean += info
295301
}
296302
}
303+
304+
appsToClean = leftToClean
297305
} catch {
298306
case t: Exception => logError("Exception in cleaning logs", t)
299307
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,12 @@ private[spark] class Executor(
220220
val afterSerialization = System.currentTimeMillis()
221221

222222
for (m <- task.metrics) {
223-
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
224-
m.setExecutorRunTime(taskFinish - taskStart)
223+
// Deserialization happens in two parts: first, we deserialize a Task object, which
224+
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
225+
m.setExecutorDeserializeTime(
226+
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
227+
// We need to subtract Task.run()'s deserialization time to avoid double-counting
228+
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
225229
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
226230
m.setResultSerializationTime(afterSerialization - beforeSerialization)
227231
}

core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](
5353

5454
override def runTask(context: TaskContext): U = {
5555
// Deserialize the RDD and the func using the broadcast variables.
56+
val deserializeStartTime = System.currentTimeMillis()
5657
val ser = SparkEnv.get.closureSerializer.newInstance()
5758
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
5859
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
60+
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
5961

6062
metrics = Some(context.taskMetrics)
6163
func(context, rdd.iterator(partition, context))

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(
5656

5757
override def runTask(context: TaskContext): MapStatus = {
5858
// Deserialize the RDD using the broadcast variable.
59+
val deserializeStartTime = System.currentTimeMillis()
5960
val ser = SparkEnv.get.closureSerializer.newInstance()
6061
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
6162
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
63+
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
6264

6365
metrics = Some(context.taskMetrics)
6466
var writer: ShuffleWriter[Any, Any] = null

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,18 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
8787
// initialized when kill() is invoked.
8888
@volatile @transient private var _killed = false
8989

90+
protected var _executorDeserializeTime: Long = 0
91+
9092
/**
9193
* Whether the task has been killed.
9294
*/
9395
def killed: Boolean = _killed
9496

97+
/**
98+
* Returns the amount of time spent deserializing the RDD and function to be run.
99+
*/
100+
def executorDeserializeTime: Long = _executorDeserializeTime
101+
95102
/**
96103
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
97104
* code and user code to properly handle the flag. This function should be idempotent so it can

core/src/main/scala/org/apache/spark/ui/ToolTips.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ private[spark] object ToolTips {
2424
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
2525
of task results."""
2626

27-
val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
27+
val TASK_DESERIALIZATION_TIME =
28+
"""Time spent deserializing the task closure on the executor, including the time to read the
29+
broadcasted task."""
2830

2931
val SHUFFLE_READ_BLOCKED_TIME =
3032
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."

0 commit comments

Comments
 (0)