Skip to content

Commit 380880f

Browse files
committed
Merge remote-tracking branch 'upstream/master' into UDAF
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
2 parents 0a827b3 + 163e3f1 commit 380880f

File tree

182 files changed

+8406
-2377
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

182 files changed

+8406
-2377
lines changed

R/pkg/R/DataFrame.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ setMethod("except",
13141314
#' write.df(df, "myfile", "parquet", "overwrite")
13151315
#' }
13161316
setMethod("write.df",
1317-
signature(df = "DataFrame", path = 'character'),
1317+
signature(df = "DataFrame", path = "character"),
13181318
function(df, path, source = NULL, mode = "append", ...){
13191319
if (is.null(source)) {
13201320
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
@@ -1328,7 +1328,7 @@ setMethod("write.df",
13281328
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
13291329
options <- varargsToEnv(...)
13301330
if (!is.null(path)) {
1331-
options[['path']] <- path
1331+
options[["path"]] <- path
13321332
}
13331333
callJMethod(df@sdf, "save", source, jmode, options)
13341334
})
@@ -1337,7 +1337,7 @@ setMethod("write.df",
13371337
#' @aliases saveDF
13381338
#' @export
13391339
setMethod("saveDF",
1340-
signature(df = "DataFrame", path = 'character'),
1340+
signature(df = "DataFrame", path = "character"),
13411341
function(df, path, source = NULL, mode = "append", ...){
13421342
write.df(df, path, source, mode, ...)
13431343
})
@@ -1375,8 +1375,8 @@ setMethod("saveDF",
13751375
#' saveAsTable(df, "myfile")
13761376
#' }
13771377
setMethod("saveAsTable",
1378-
signature(df = "DataFrame", tableName = 'character', source = 'character',
1379-
mode = 'character'),
1378+
signature(df = "DataFrame", tableName = "character", source = "character",
1379+
mode = "character"),
13801380
function(df, tableName, source = NULL, mode="append", ...){
13811381
if (is.null(source)) {
13821382
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)

R/pkg/R/SQLContext.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ dropTempTable <- function(sqlContext, tableName) {
457457
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
458458
options <- varargsToEnv(...)
459459
if (!is.null(path)) {
460-
options[['path']] <- path
460+
options[["path"]] <- path
461461
}
462462
if (is.null(source)) {
463463
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
@@ -506,7 +506,7 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
506506
createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
507507
options <- varargsToEnv(...)
508508
if (!is.null(path)) {
509-
options[['path']] <- path
509+
options[["path"]] <- path
510510
}
511511
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
512512
dataFrame(sdf)

R/pkg/R/serialize.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ writeType <- function(con, class) {
140140
jobj = "j",
141141
environment = "e",
142142
Date = "D",
143-
POSIXlt = 't',
144-
POSIXct = 't',
143+
POSIXlt = "t",
144+
POSIXct = "t",
145145
stop(paste("Unsupported type for serialization", class)))
146146
writeBin(charToRaw(type), con)
147147
}

R/pkg/R/sparkR.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ sparkR.init <- function(
140140
if (!file.exists(path)) {
141141
stop("JVM is not ready after 10 seconds")
142142
}
143-
f <- file(path, open='rb')
143+
f <- file(path, open="rb")
144144
backendPort <- readInt(f)
145145
monitorPort <- readInt(f)
146146
close(f)

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ test_that("infer types", {
5757
expect_equal(infer_type(as.Date("2015-03-11")), "date")
5858
expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
5959
expect_equal(infer_type(c(1L, 2L)),
60-
list(type = 'array', elementType = "integer", containsNull = TRUE))
60+
list(type = "array", elementType = "integer", containsNull = TRUE))
6161
expect_equal(infer_type(list(1L, 2L)),
62-
list(type = 'array', elementType = "integer", containsNull = TRUE))
62+
list(type = "array", elementType = "integer", containsNull = TRUE))
6363
testStruct <- infer_type(list(a = 1L, b = "2"))
6464
expect_equal(class(testStruct), "structType")
6565
checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE)

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object TaskContext {
3838
*/
3939
def getPartitionId(): Int = {
4040
val tc = taskContext.get()
41-
if (tc == null) {
41+
if (tc eq null) {
4242
0
4343
} else {
4444
tc.partitionId()

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator}
2525
import scala.collection.JavaConversions._
2626
import scala.concurrent.duration._
2727
import scala.language.postfixOps
28+
import scala.util.control.NonFatal
2829

2930
import com.google.common.primitives.Longs
3031
import org.apache.hadoop.conf.Configuration
@@ -248,19 +249,25 @@ class SparkHadoopUtil extends Logging {
248249
dir: Path,
249250
prefix: String,
250251
exclusionSuffix: String): Array[FileStatus] = {
251-
val fileStatuses = remoteFs.listStatus(dir,
252-
new PathFilter {
253-
override def accept(path: Path): Boolean = {
254-
val name = path.getName
255-
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
252+
try {
253+
val fileStatuses = remoteFs.listStatus(dir,
254+
new PathFilter {
255+
override def accept(path: Path): Boolean = {
256+
val name = path.getName
257+
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
258+
}
259+
})
260+
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
261+
override def compare(o1: FileStatus, o2: FileStatus): Int = {
262+
Longs.compare(o1.getModificationTime, o2.getModificationTime)
256263
}
257264
})
258-
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
259-
override def compare(o1: FileStatus, o2: FileStatus): Int = {
260-
Longs.compare(o1.getModificationTime, o2.getModificationTime)
261-
}
262-
})
263-
fileStatuses
265+
fileStatuses
266+
} catch {
267+
case NonFatal(e) =>
268+
logWarning("Error while attempting to list files from application staging dir", e)
269+
Array.empty
270+
}
264271
}
265272

266273
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -508,8 +508,14 @@ object SparkSubmit {
508508
}
509509

510510
// Let YARN know it's a pyspark app, so it distributes needed libraries.
511-
if (clusterManager == YARN && args.isPython) {
512-
sysProps.put("spark.yarn.isPython", "true")
511+
if (clusterManager == YARN) {
512+
if (args.isPython) {
513+
sysProps.put("spark.yarn.isPython", "true")
514+
}
515+
if (args.principal != null) {
516+
require(args.keytab != null, "Keytab must be specified when the keytab is specified")
517+
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
518+
}
513519
}
514520

515521
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
407407

408408
/**
409409
* Comparison function that defines the sort order for application attempts within the same
410-
* application. Order is: running attempts before complete attempts, running attempts sorted
411-
* by start time, completed attempts sorted by end time.
410+
* application. Order is: attempts are sorted by descending start time.
411+
* Most recent attempt state matches with current state of the app.
412412
*
413413
* Normally applications should have a single running attempt; but failure to call sc.stop()
414414
* may cause multiple running attempts to show up.
@@ -418,11 +418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
418418
private def compareAttemptInfo(
419419
a1: FsApplicationAttemptInfo,
420420
a2: FsApplicationAttemptInfo): Boolean = {
421-
if (a1.completed == a2.completed) {
422-
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
423-
} else {
424-
!a1.completed
425-
}
421+
a1.startTime >= a2.startTime
426422
}
427423

428424
/**

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
243243
appListAfterRename.size should be (1)
244244
}
245245

246-
test("apps with multiple attempts") {
246+
test("apps with multiple attempts with order") {
247247
val provider = new FsHistoryProvider(createTestConf())
248248

249-
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
249+
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
250250
writeFile(attempt1, true, None,
251-
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
252-
SparkListenerApplicationEnd(2L)
251+
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
253252
)
254253

255254
updateAndCheck(provider) { list =>
@@ -259,7 +258,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
259258

260259
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
261260
writeFile(attempt2, true, None,
262-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
261+
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
263262
)
264263

265264
updateAndCheck(provider) { list =>
@@ -268,30 +267,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
268267
list.head.attempts.head.attemptId should be (Some("attempt2"))
269268
}
270269

271-
val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
272-
attempt2.delete()
273-
writeFile(attempt2, true, None,
274-
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
270+
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
271+
writeFile(attempt3, true, None,
272+
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
275273
SparkListenerApplicationEnd(4L)
276274
)
277275

278276
updateAndCheck(provider) { list =>
279277
list should not be (null)
280278
list.size should be (1)
281-
list.head.attempts.size should be (2)
282-
list.head.attempts.head.attemptId should be (Some("attempt2"))
279+
list.head.attempts.size should be (3)
280+
list.head.attempts.head.attemptId should be (Some("attempt3"))
283281
}
284282

285283
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
286-
writeFile(attempt2, true, None,
284+
writeFile(attempt1, true, None,
287285
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
288286
SparkListenerApplicationEnd(6L)
289287
)
290288

291289
updateAndCheck(provider) { list =>
292290
list.size should be (2)
293291
list.head.attempts.size should be (1)
294-
list.last.attempts.size should be (2)
292+
list.last.attempts.size should be (3)
295293
list.head.attempts.head.attemptId should be (Some("attempt1"))
296294

297295
list.foreach { case app =>

0 commit comments

Comments
 (0)